Speed Up your Keras Sequence Pipeline

TL; DR

When using tf.keras.utils.Sequence to generate batches, the data copy overhead between processed can be very high. This leads to worker processes being blocked most of the time, and decline in batch generation. A common solution is the use of shared memory to share data between processes. PyTorch uses it. With Python 3.8, you can use shared_memory from multiprocessing library to achieve the same. Check out the code at the end.

The Problem

Tensorflow has good tf.data API for creating an input data pipeline. The backend of the API is in C++, free of Python GIL, allowing it to truly run code in parallel. You can also insert arbitrary python/numpy code in between of input pipeline, but this code can be a bottleneck because of GIL. Now to prevent this, you can write a pipeline using tf.keras.utils.Sequence. Moreover, you can increase workers in model.fit to generated batches in parallel. And you can switch the use_multiprocessing=True flag to make your pipeline free of GIL. But does this work?

Let's start with some code.

The code below creates a dummy Keras Sequence, such that the compute is heavy.

import tensorflow as tf
import numpy as np


class DataSequence(tf.keras.utils.Sequence):

def __getitem__(self, item):
# Compute intensive data
return np.random.rand(256, 224, 224, 3).astype('float32')

def __len__(self):
return 128

We train a dummy model using this DataSequence

if __name__ == '__main__':
dataset = DataSequence()
model = tf.keras.Sequential()
model.compile()
model.fit(dataset, epochs=1)

Running this code trains a dummy model on the given Sequence. The output if something like this

Epoch 1/4
128/128 [==============================] - 104s 815ms/step - loss: 0.0000e+00

Now, let's increase the number of workers to 32.

model.fit(dataset, epochs=4, workers=32)

Output:

Epoch 1/4
128/128 [==============================] - 65s 509ms/step - loss: 0.0000e+00

With workers=32 we just got a speedup of 104/65= ~1.6 times. This is bad. What is the cause of this? Even with 32 workers, the CPU utilization of the process was ~200% (32 cores). This is because of the almighty Python GIL. How to bypass it? use_multiprocessing=True

model.fit(dataset, epochs=1, workers=32, use_multiprocessing=True)

Output:


WARNING:tensorflow:multiprocessing can interact badly with TensorFlow, causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended.
[==============================] - 62s 521ms/step - loss: 0.0000e+00

Even with enabling multiprocessing, you get a speedup of 104/62 = ~1.7. Now, what went wrong here? Here I found that 32 processed were being created but their utilization was very low. Most of them were on 0% for most of the time.

What is the cause?

The cause is the data transfer between processes. The numpy array data is transferred between processed via multiprocessing Pipe, which is slow and not right for this use case. I did a small change to validate this hypothesis. Changed my Sequence to compute large array but send only a small array, say of size [224,224,3]

128/128 [==============================] - 4s 32ms/step - loss: 0.0000e+00In [8]: model.fit(dataset, epochs=1, workers=32, use_multiprocessing=True)                                                                                                  
WARNING:tensorflow:multiprocessing can interact badly with TensorFlow, causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended.
128/128 [==============================] - 7s 58ms/step - loss: 0.0000e+00
Out[8]: <tensorflow.python.keras.callbacks.History at 0x7f0cec4de640>
In [9]: model.fit(dataset, epochs=1, workers=32, use_multiprocessing=True, max_queue_size=32)
WARNING:tensorflow:multiprocessing can interact badly with TensorFlow, causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended.
128/128 [==============================] - 4s 29ms/step - loss: 0.0000e+00
Out[9]: <tensorflow.python.keras.callbacks.History at 0x7f0cec4dc730>
In [10]: model.fit(dataset, epochs=1, workers=32, use_multiprocessing=True, max_queue_size=128)
WARNING:tensorflow:multiprocessing can interact badly with TensorFlow, causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended.
128/128 [==============================] - 3s 27ms/step - loss: 0.0000e+00
Out[10]: <tensorflow.python.keras.callbacks.History at 0x7f0cec5f9550>
In [11]: model.fit(dataset, epochs=1, workers=32, use_multiprocessing=True, max_queue_size=256)
WARNING:tensorflow:multiprocessing can interact badly with TensorFlow, causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended.
128/128 [==============================] - 3s 27ms/step - loss: 0.0000e+00
Out[11]: <tensorflow.python.keras.callbacks.History at 0x7f0cf26cc5b0>

As you can see, the batch generation is fast now. The processes were having good utilization. Note that I increased max_queue_size when doing this, to prevent consumer from blocking the producer.

In a nutshell, the data transfer between processes is slow and blocking the worker process from generating new batches. This mainly happens because the default IPC channel is Pipe, which requires data to go through kernel. According to this, a total of 4 copies are made when data is transferred through pipes.

Solution: Shared Memory

With Python 3.8, shared_memory module is available in multiprocessing. With shared memory, you can dump an array in shared memory chunk and reaccess that memory block in another process. Zero copy. PyTorch uses this in DataLoader when trying to get records from multiple processes [Reference: Link]. A similar thing can be achieved in tensorflow using the code below:

The above function takes a Keras Sequence and returns a generator. The generator can be consumed by the model.fit to train the model.

gen = shared_mem_multiprocessing(dataset, workers=32)
model.fit(gen, epochs=4)

Finishing up

So, I have shown you how to use shared memory to transfer data between processes, specifically transfer numpy arrays between processes without copying data. Well, technically there is one copy when data is copied into shared memory. This also can be avoided if you use the shared memory in the first place. I’ll leave that optimization to the user.

The above code shown is unsafe to use because it does not remove the shared memory blocks (caveats of handling memory manually). Well, you can use the below code for that, this takes care of removing the used memory blocks when GC is triggered. The below code also shows how to use shared memory with multiple numpy arrays (say x and y).

Remarks

With this snippet, you can linearly scale your Keras Sequence pipeline across any number of processes. You can use any python library and write a complete python-based input pipeline.

Let me know if it helps you. Happy coding!

Environment

  • Host Type: p3.8xlarge
  • Python Version: 3.8
  • Tensorflow Version: 2.3

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store