'Image processing in Tensor flow TFX pipelines

I am trying to get a Tensorflow TFX pipeline up and running using the MNIST dataset.

# Imports
import pandas as pd
import numpy as np
from keras.datasets import mnist
import tensorflow as tf
from tfx import v1 as tfx
import os
from tfx.components import ImportExampleGen

from platform import python_version
python_version() #'3.8.8'

# Load the data - 60,000 training examples and 10,000 testing examples
(train_x, train_y), (test_x, test_y) = mnist.load_data()

Setup pipeline paths

_pipeline_root = './pipeline'
_data_root = './data'
if not os.path.isdir(_pipeline_root) and not os.path.isdir(_data_root):
    !mkdir {_pipeline_root}
    !mkdir {_data_root}

Write the data to TF.record format and save in eval and train dirs. NOTE that the MNIST data starts as a numpy array 28x28 and is converted to a bytestring to enable it to be encoded as part of the Tf.record.


def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))): # if value ist tensor
        value = value.numpy() # get value of tensor
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))



def serialize_array(array):
    array = tf.io.serialize_tensor(array)
    return array

def image_label_to_tf_train(image, label):

    image_shape = np.shape(image)
    #define the dictionary -- the structure -- of our single example
    data = {
        'height': _int64_feature(image_shape[0]),
        'width': _int64_feature(image_shape[1]),
        'raw_image' : _bytes_feature(serialize_array(image)),
        'label' : _int64_feature(label)
    }
    #create an Example, wrapping the single features
    return tf.train.Example(features=tf.train.Features(feature=data))

def write_images_to_tfr_short(images, labels, filename:str="images", folder = ""):
    if not os.path.isdir(folder):
        !mkdir {folder}
    filename= folder + "/" + filename+".tfrecords"
    writer = tf.io.TFRecordWriter(filename) #create a writer that'll store our data to disk
    count = 0

    for index in range(len(images)):

        #get the data we want to write
        current_image = images[index]
        current_label = labels[index]

        out = image_label_to_tf_train(image=current_image, label=current_label)
        writer.write(out.SerializeToString())
        count += 1

    writer.close()
    print(f"Wrote {count} elements to TFRecord")
    return count

The next stage is to call the transform component which uses the preprocessing_fn. This function should process all the data so for example divide the image array by 255 is a standard feature process. But the image is still as a bytestring and I can't for the life of me figure out how to turn it back into an array. The below is what I have tried.

def preprocessing_fn(inputs):
    """tf.transform's callback function for preprocessing inputs.
    Args:
        inputs: map from feature keys to raw not-yet-transformed features.
    Returns:
        Map from string feature key to transformed feature operations.
    """
    

    # Initialize outputs dictionary
    outputs = {}
    
    
    raw_image_dataset = inputs[_IMAGE_KEY]
    
    
    img = tf.io.decode_raw(raw_image_dataset, tf.int64)
    
    
    outputs[_IMAGE_KEY] = img
  
    
    
    outputs[_LABEL_KEY] = tf.cast(inputs[_LABEL_KEY], tf.int64)



    return outputs

I get the following error:

WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Tuple[Dict[str, Union[NoneType, _Dataset]], Union[Dict[str, Dict[str, PCollection]], NoneType], int] instead.
WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Tuple[Dict[str, Union[NoneType, _Dataset]], Union[Dict[str, Dict[str, PCollection]], NoneType], int] instead.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
INFO:tensorflow:Assets written to: ./pipeline/Transform/transform_graph/225/.temp_path/tftransform_tmp/26150ae80de847fab932efeb0f0c610f/assets
INFO:tensorflow:Assets written to: ./pipeline/Transform/transform_graph/225/.temp_path/tftransform_tmp/26150ae80de847fab932efeb0f0c610f/assets
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()

/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
   1636   if fn_takes_side_inputs(fn):
-> 1637     wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
   1638   else:

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in _create_v2_saved_model(tensor_replacement_map, base_temp_dir, preprocessing_fn, input_signature, baseline_analyzers_fingerprint, output_keys_to_name_map)
    662   saved_model_dir = beam_common.get_unique_temp_path(base_temp_dir)
--> 663   impl_helper.trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn,
    664                                              input_signature, base_temp_dir,

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn, input_signature, base_temp_dir, baseline_analyzers_fingerprint, tensor_replacement_map, output_keys_to_name_map)
    893       analyzer_nodes.TENSOR_REPLACEMENTS):
--> 894     metadata = _trace_and_get_metadata(concrete_transform_fn, structured_inputs,
    895                                        preprocessing_fn, base_temp_dir,

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in _trace_and_get_metadata(concrete_transform_fn, structured_inputs, preprocessing_fn, base_temp_dir, tensor_replacement_map)
    805   return dataset_metadata.DatasetMetadata(
--> 806       schema=schema_inference.infer_feature_schema_v2(
    807           concrete_transform_fn.structured_outputs,

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in infer_feature_schema_v2(features, concrete_metadata_fn, evaluate_schema_overrides)
    255         metadata)
--> 256   return _infer_feature_schema_common(
    257       features,

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _infer_feature_schema_common(features, tensor_ranges, feature_annotations, global_annotations, is_evaluation_complete)
    300           min=min_value, max=max_value, is_categorical=True)
--> 301   feature_spec = _feature_spec_from_batched_tensors(features,
    302                                                     is_evaluation_complete)

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _feature_spec_from_batched_tensors(tensors, is_evaluation_complete)
    128           dim is None for dim in shape.as_list()[1:]):
--> 129         raise ValueError(
    130             'Feature {} ({}) had invalid shape {} for FixedLenFeature: apart '

ValueError: Feature raw_image (Tensor("Identity_1:0", shape=(None, 1, None), dtype=int64)) had invalid shape (None, 1, None) for FixedLenFeature: apart from the batch dimension, all dimensions must have known size

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
<ipython-input-37-7beafa4fe436> in <module>
      3     schema=schema_gen.outputs['schema'],
      4     module_file=os.path.abspath(_mnist_transform_module))
----> 5 context.run(transform, enable_cache=False)

/opt/conda/lib/python3.8/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run_if_ipython(*args, **kwargs)
     61       # __IPYTHON__ variable is set by IPython, see
     62       # https://ipython.org/ipython-doc/rel-0.10.2/html/interactive/reference.html#embedding-ipython.
---> 63       return fn(*args, **kwargs)
     64     else:
     65       absl.logging.warning(

/opt/conda/lib/python3.8/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run(self, component, enable_cache, beam_pipeline_args)
    181         telemetry_utils.LABEL_TFX_RUNNER: runner_label,
    182     }):
--> 183       execution_id = launcher.launch().execution_id
    184 
    185     return execution_result.ExecutionResult(

/opt/conda/lib/python3.8/site-packages/tfx/orchestration/launcher/base_component_launcher.py in launch(self)
    198       # be immutable in this context.
    199       # output_dict can still be changed, specifically properties.
--> 200       self._run_executor(execution_decision.execution_id,
    201                          copy.deepcopy(execution_decision.input_dict),
    202                          execution_decision.output_dict,

/opt/conda/lib/python3.8/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py in _run_executor(self, execution_id, input_dict, output_dict, exec_properties)
     71     # be immutable in this context.
     72     # output_dict can still be changed, specifically properties.
---> 73     executor.Do(
     74         copy.deepcopy(input_dict), output_dict, copy.deepcopy(exec_properties))

/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in Do(self, input_dict, output_dict, exec_properties)
    581     # remove the `_pip_dependencies` attribute.
    582     with udf_utils.TempPipInstallContext(self._pip_dependencies):
--> 583       TransformProcessor().Transform(label_inputs, label_outputs, status_file)
    584     logging.debug('Cleaning up temp path %s on executor success', temp_path)
    585     io_utils.delete_dir(temp_path)

/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in Transform(***failed resolving arguments***)
   1114     materialization_format = (
   1115         transform_paths_file_formats[-1] if materialize_output_paths else None)
-> 1116     self._RunBeamImpl(analyze_data_list, transform_data_list, preprocessing_fn,
   1117                       stats_options_updater_fn, force_tf_compat_v1,
   1118                       input_dataset_metadata, transform_output_path,

/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in _RunBeamImpl(self, analyze_data_list, transform_data_list, preprocessing_fn, stats_options_updater_fn, force_tf_compat_v1, input_dataset_metadata, transform_output_path, raw_examples_data_format, temp_path, input_cache_dir, output_cache_dir, disable_statistics, per_set_stats_output_paths, materialization_format, analyze_paths_count, stats_output_paths, make_beam_pipeline_fn)
   1496             for dataset in transform_data_list:
   1497               infix = 'TransformIndex{}'.format(dataset.index)
-> 1498               (dataset.transformed
   1499                | 'EncodeAndSerialize[{}]'.format(infix) >> beam.ParDo(
   1500                    self._RecordBatchToExamplesFn(transformed_schema_proto))

/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
    594     try:
    595       if not exc_type:
--> 596         self.result = self.run()
    597         self.result.wait_until_finish()
    598     finally:

/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    571         finally:
    572           shutil.rmtree(tmpdir)
--> 573       return self.runner.run_pipeline(self, self._options)
    574     finally:
    575       if not is_in_ipython():

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
    129       runner = BundleBasedDirectRunner()
    130 
--> 131     return runner.run_pipeline(pipeline, options)
    132 
    133 

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
    197         options.view_as(pipeline_options.ProfilingOptions))
    198 
--> 199     self._latest_run_result = self.run_via_runner_api(
    200         pipeline.to_runner_api(default_environment=self._default_environment))
    201     return self._latest_run_result

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
    208     # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
    209     #   the teststream (if any), and all the stages).
--> 210     return self.run_stages(stage_context, stages)
    211 
    212   @contextlib.contextmanager

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
    393           )
    394 
--> 395           stage_results = self._run_stage(
    396               runner_execution_context, bundle_context_manager)
    397 

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
    658     while True:
    659       last_result, deferred_inputs, fired_timers, watermark_updates = (
--> 660           self._run_bundle(
    661               runner_execution_context,
    662               bundle_context_manager,

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
    781         expected_timer_output)
    782 
--> 783     result, splits = bundle_manager.process_bundle(
    784         data_input, data_output, input_timers, expected_timer_output)
    785     # Now we collect all the deferred inputs remaining from bundle execution.

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
   1092             process_bundle_descriptor.id,
   1093             cache_tokens=[next(self._cache_token_generator)]))
-> 1094     result_future = self._worker_handler.control_conn.push(process_bundle_req)
   1095 
   1096     split_results = []  # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
    376       self._uid_counter += 1
    377       request.instruction_id = 'control_%s' % self._uid_counter
--> 378     response = self.worker.do_instruction(request)
    379     return ControlFuture(request.instruction_id, response)
    380 

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
    578     if request_type:
    579       # E.g. if register is set, this will call self.register(request.register))
--> 580       return getattr(self, request_type)(
    581           getattr(request, request_type), request.instruction_id)
    582     else:

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
    616         with self.maybe_profile(instruction_id):
    617           delayed_applications, requests_finalization = (
--> 618               bundle_processor.process_bundle(instruction_id))
    619           monitoring_infos = bundle_processor.monitoring_infos()
    620           monitoring_infos.extend(self.state_cache_metrics_fn())

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
    993                   element.timer_family_id, timer_data)
    994           elif isinstance(element, beam_fn_api_pb2.Elements.Data):
--> 995             input_op_by_transform_id[element.transform_id].process_encoded(
    996                 element.data)
    997 

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
    219       decoded_value = self.windowed_coder_impl.decode_from_stream(
    220           input_stream, True)
--> 221       self.output(decoded_value)
    222 
    223   def monitoring_infos(self, transform_id, tag_to_pcollection_id):

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()

/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
   1635   from apache_beam.transforms.util import fn_takes_side_inputs
   1636   if fn_takes_side_inputs(fn):
-> 1637     wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
   1638   else:
   1639     wrapper = lambda x: [fn(x)]

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in _create_v2_saved_model(tensor_replacement_map, base_temp_dir, preprocessing_fn, input_signature, baseline_analyzers_fingerprint, output_keys_to_name_map)
    661   """
    662   saved_model_dir = beam_common.get_unique_temp_path(base_temp_dir)
--> 663   impl_helper.trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn,
    664                                              input_signature, base_temp_dir,
    665                                              baseline_analyzers_fingerprint,

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn, input_signature, base_temp_dir, baseline_analyzers_fingerprint, tensor_replacement_map, output_keys_to_name_map)
    892   if not concrete_transform_fn.graph.get_collection(
    893       analyzer_nodes.TENSOR_REPLACEMENTS):
--> 894     metadata = _trace_and_get_metadata(concrete_transform_fn, structured_inputs,
    895                                        preprocessing_fn, base_temp_dir,
    896                                        tensor_replacement_map)

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in _trace_and_get_metadata(concrete_transform_fn, structured_inputs, preprocessing_fn, base_temp_dir, tensor_replacement_map)
    804       evaluate_schema_overrides=True)
    805   return dataset_metadata.DatasetMetadata(
--> 806       schema=schema_inference.infer_feature_schema_v2(
    807           concrete_transform_fn.structured_outputs,
    808           concrete_metadata_fn,

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in infer_feature_schema_v2(features, concrete_metadata_fn, evaluate_schema_overrides)
    254     tensor_annotations, global_annotations = _get_schema_annotations_v2(
    255         metadata)
--> 256   return _infer_feature_schema_common(
    257       features,
    258       tensor_ranges,

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _infer_feature_schema_common(features, tensor_ranges, feature_annotations, global_annotations, is_evaluation_complete)
    299       domains[name] = schema_pb2.IntDomain(
    300           min=min_value, max=max_value, is_categorical=True)
--> 301   feature_spec = _feature_spec_from_batched_tensors(features,
    302                                                     is_evaluation_complete)
    303 

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _feature_spec_from_batched_tensors(tensors, is_evaluation_complete)
    127       if is_evaluation_complete and any(
    128           dim is None for dim in shape.as_list()[1:]):
--> 129         raise ValueError(
    130             'Feature {} ({}) had invalid shape {} for FixedLenFeature: apart '
    131             'from the batch dimension, all dimensions must have known size'

ValueError: Feature raw_image (Tensor("Identity_1:0", shape=(None, 1, None), dtype=int64)) had invalid shape (None, 1, None) for FixedLenFeature: apart from the batch dimension, all dimensions must have known size [while running 'Analyze/CreateSavedModel[tf_v2_only]/CreateSavedModel']

I know the label feature is working as I can call the below code and get a print as so....

transform = tfx.components.Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath(_mnist_transform_module))
context.run(transform, enable_cache=False)
# Get the URI of the output artifact representing the transformed examples
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'Split-train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Decode the first record and print output
for tfrecord in dataset.take(1):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  print(example)

IF I remove the lines:

img = tf.io.decode_raw(raw_image_dataset, tf.int64)
    
    
    outputs[_IMAGE_KEY] = img

I get printed

features {
  feature {
    key: "label"
    value {
      int64_list {
        value: 5
      }
    }
  }
}

This shows what I am doing to the label feature is working but I really can't figure how to transform the image bytes. Part of the issue is I'm not completely sure what the format is as it's just a tensor which is pretty opaque. It seems given the label operation I'm operating on a column of data effectively but again, can't figure the correct operation or syntax



Solution 1:[1]

For any future viewers this works

raw_image_dataset = tf.map_fn(fn = lambda x : tf.io.parse_tensor(x[0], tf.uint8, name=None), elems = raw_image_dataset, fn_output_signature = tf.TensorSpec((28,28),dtype=tf.uint8,    name=None), infer_shape = True)
    raw_image_dataset = tf.cast(raw_image_dataset, tf.int64)
    outputs[_IMAGE_KEY] = raw_image_dataset

Solution 2:[2]

So I think I solved this using

raw_image_dataset = inputs[_IMAGE_KEY]
    
    raw_image_dataset = tf.map_fn(fn = lambda x : tf.io.decode_image(x[0]) , elems = raw_image_dataset, dtype=tf.uint8)

Theres something about the data going in as a batch so needing to map it and also using the right component of the resulting tensor "x[0]", I'm still not 100% sure on why this is the case but it seems to run.

Now I'm struggling with TFX as it won't let me output features that are different to the what went in...

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 tim_ai
Solution 2 tim_ai