pandas and multiprocessing memory management: splitting a dataframe into multiple chunks

  • Last Update :
  • Techknowledgy :

There is a simple solution: Instead of pool = mp.Pool(n_jobs), I use the new context feature of multiprocessing:

ctx = mp.get_context('spawn')
pool = ctx.Pool(n_jobs)

Note: chunksize is not available to pd.read_pickle or other loading methods that are compressed on storage.

def main():
# Job parameters
n_jobs = 4 # Poolsize
size = (10000, 1000) # Size of DataFrame
chunksize = 100 # Maximum size of Frame Chunk

# Preparation
df = pd.DataFrame(np.random.rand(*size))
pool = mp.Pool(n_jobs)

print('Starting MP')

# Execute the wait and print function in parallel

df_chunked = pd.read_csv('<filepath>.csv',chunksize = chunksize) # modified
   pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, df_chunked) # modified

   pool.close()
   pool.join()

   print('DONE')

Suggestion : 2

I have to process a huge pandas.DataFrame (several tens of GB) on a row by row bases, where each row operation is quite lengthy (a couple of tens of milliseconds). So I had the idea to split up the frame into chunks and process each chunk in parallel using multiprocessing. This does speed-up the task, but the memory consumption is a nightmare., 6 days ago Dec 19, 2016  · I have to process a huge pandas.DataFrame (several tens of GB) on a row by row bases, where each row operation is quite lengthy (a couple of tens of milliseconds). So I had the idea to split up the frame into chunks and process each chunk in parallel using multiprocessing.This does speed-up the task, but the memory consumption is a nightmare. ,  › How to fix a range on some properties when create a testclass by autofixture ,The problem is that the child processes are forked from the parent, so all of them contain a reference to the original DataFrame. However, the frame is manipulated in the original process, so the copy-on-write behavior kills the whole thing slowly and eventually when the limit of the physical memory is reached.


def just_wait_and_print_len_and_idx(df): ""
"Waits for 5 seconds and prints df length and first and last index"
""
# Extract some info idx_values = df.index.values first_idx, last_idx = idx_values[0], idx_values[-1] length = len(df) pid = os.getpid() # Waste some CPU cycles time.sleep(1) # Print the info print('First idx {}, last idx {} and len {} '
   'from process {}'.format(first_idx, last_idx, length, pid))

ctx = mp.get_context('spawn') pool = ctx.Pool(n_jobs)
def just_wait_and_print_len_and_idx(df): ""
"Waits for 5 seconds and prints df length and first and last index"
""
# Extract some infoidx_values = df.index.valuesfirst_idx, last_idx = idx_values[0], idx_values[-1] length = len(df) pid = os.getpid() # Waste some CPU cyclestime.sleep(1) # Print the infoprint('First idx {}, last idx {} and len {} '
   'from process {}'.format(first_idx, last_idx, length, pid))
def df_chunking(df, chunksize): ""
"Splits df into chunks, drops data of original df inplace"
""
count = 0 # Counter
for chunkswhile len(df): count += 1 print('Preparing chunk {}'.format(count)) # Return df chunk yield df.iloc[: chunksize].copy() # Delete data in place because it is no longer needed df.drop(df.index[: chunksize], inplace = True)
def main(): # Job parametersn_jobs = 4 # Poolsizesize = (10000, 1000) # Size of DataFramechunksize = 100 # Maximum size of Frame Chunk # Preparationdf = pd.DataFrame(np.random.rand( * size)) pool = mp.Pool(n_jobs) print('Starting MP') # Execute the wait and print
function in parallelpool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize)) pool.close() pool.join() print('DONE')
Starting MP Preparing chunk 1 Preparing chunk 2 First idx 0, last idx 99 and len 100 from process 9913 First idx 100, last idx 199 and len 100 from process 9914 Preparing chunk 3 First idx 200, last idx 299 and len 100 from process 9915 Preparing chunk 4...DONE

Suggestion : 3

The problem is that the child processes are forked from the parent, so all of them contain a reference to the original DataFrame. However, the frame is manipulated in the original process, so the copy-on-write behavior kills the whole thing slowly and eventually when the limit of the physical memory is reached.,This guarantees that the Pool processes are just spawned and not forked from the parent process. Accordingly, none of them has access to the original DataFrame and all of them only need a tiny fraction of the parent's memory.,There is a simple solution: Instead of pool = mp.Pool(n_jobs), I use the new context feature of multiprocessing:,Ok, so I figured it out after the hint by Sebastian Opałczyński in the comments.

There is a simple solution: Instead of pool = mp.Pool(n_jobs), I use the new context feature of multiprocessing:

ctx = mp.get_context('spawn')
pool = ctx.Pool(n_jobs)

Suggestion : 4

Here is a slightly faster version. It enumerates the groups and returns them via their group number.,The answer from Dennis Golomazov was too slow for my dataframes. Storing the groups in a list and returning them with pd.concat() is a performance killer.,Pandas left join on duplicate keys but without increasing the number of columns,Split a dataframe into chunks where each chunk has no common non-zero element with the other chunks

Works in Python 2 and 3:

df = pd.DataFrame(data = ['a', 'a', 'b', 'c', 'a', 'a', 'b', 'v', 'v', 'f'], columns = ['A'])

def iter_by_group(df, column, num_groups):
   groups = []
for i, group in df.groupby(column):
   groups.append(group)
if len(groups) == num_groups:
   yield pd.concat(groups)
groups = []
if groups:
   yield pd.concat(groups)

for group in iter_by_group(df, 'A', 2):
   print(group)

A
0 a
1 a
4 a
5 a
2 b
6 b

A
3 c
9 f

A
7 v
8 v

This should work as well:

n = 2
splits = {
   g: df
   for g,
   df in df.groupby(df.groupby('A').ngroup().floordiv(n))
}

Each df can be accessed by the key in the dictionary. It would also then be possible to concat them back to one df, which now shows the group in the index

pd.concat(splits, names = ['groups'])

Here is a slightly faster version. It enumerates the groups and returns them via their group number.

import pandas as pd

def group_chunks(df, column, chunk_size):
   df["n_group"] = df.groupby(column).ngroup()
lower_group_index = 0
upper_group_index = chunk_size - 1
max_group_index = df["n_group"].max()
while lower_group_index <= max_group_index:
   yield df.loc[: , df.columns != "n_group"][
      df["n_group"].between(lower_group_index, upper_group_index)
   ]
lower_group_index = upper_group_index + 1
upper_group_index = upper_group_index + chunk_size

df = pd.DataFrame(data = ['a', 'a', 'b', 'c', 'a', 'a', 'b', 'v', 'v', 'f'], columns = ['A'])
for chunk in group_chunks(df, 'A', 2):
   print(f "{chunk.sort_values(by='A')}\n")

A
0 a
1 a
4 a
5 a
2 b
6 b

A
3 c
9 f

A
7 v
8 v

Suggestion : 5

If we were to measure the memory usage of the two calls, we’d see that specifying columns uses about 1/10th the memory in this case.,With pandas.read_csv(), you can specify usecols to limit the columns read into memory. Not all file formats that can be read by pandas provide an option to read a subset of columns.,pandas provides data structures for in-memory analytics, which makes using pandas to analyze datasets that are larger than memory datasets somewhat tricky. Even datasets that are a sizable fraction of memory become unwieldy, as some pandas operations need to make intermediate copies.,This document provides a few recommendations for scaling your analysis to larger datasets. It’s a complement to Enhancing performance, which focuses on speeding up analysis for datasets that fit in memory.

In[1]: import pandas as pd

In[2]: import numpy as np
                     id_0 name_0 x_0 y_0 id_1 name_1 x_1...name_8 x_8 y_8 id_9 name_9 x_9 y_9
                     timestamp...
                        2000 - 01 - 01 00: 00: 00 1015 Michael - 0.399453 0.095427 994 Frank - 0.176842...Dan - 0.315310 0.713892 1025 Victor - 0.135779 0.346801
                     2000 - 01 - 01 00: 01: 00 969 Patricia 0.650773 - 0.874275 1003 Laura 0.459153...Ursula 0.913244 - 0.630308 1047 Wendy - 0.886285 0.035852
                     2000 - 01 - 01 00: 02: 00 1016 Victor - 0.721465 - 0.584710 1046 Michael 0.524994...Ray - 0.656593 0.692568 1064 Yvonne 0.070426 0.432047
                     2000 - 01 - 01 00: 03: 00 939 Alice - 0.746004 - 0.908008 996 Ingrid - 0.414523...Jerry - 0.958994 0.608210 978 Wendy 0.855949 - 0.648988
                     2000 - 01 - 01 00: 04: 00 1017 Dan 0.919451 - 0.803504 1048 Jerry - 0.569235...Frank - 0.577022 - 0.409088 994 Bob - 0.270132 0.335176
                        ................................................
                        2000 - 12 - 30 23: 56: 00 999 Tim 0.162578 0.512817 973 Kevin - 0.403352...Tim - 0.380415 0.008097 1041 Charlie 0.191477 - 0.599519
                     2000 - 12 - 30 23: 57: 00 970 Laura - 0.433586 - 0.600289 958 Oliver - 0.966577...Zelda 0.971274 0.402032 1038 Ursula 0.574016 - 0.930992
                     2000 - 12 - 30 23: 58: 00 1065 Edith 0.232211 - 0.454540 971 Tim 0.158484...Alice - 0.222079 - 0.919274 1022 Dan 0.031345 - 0.657755
                     2000 - 12 - 30 23: 59: 00 1019 Ingrid 0.322208 - 0.615974 981 Hannah 0.607517...Sarah - 0.424440 - 0.117274 990 George - 0.375530 0.563312
                     2000 - 12 - 31 00: 00: 00 937 Ursula - 0.906523 0.943178 1018 Alice - 0.564513...Jerry 0.236837 0.807650 985 Oliver 0.777642 0.783392

                     [525601 rows x 40 columns]
In[3]: columns = ["id_0", "name_0", "x_0", "y_0"]

In[4]: pd.read_parquet("timeseries_wide.parquet")[columns]
Out[4]:
   id_0 name_0 x_0 y_0
timestamp
2000 - 01 - 01 00: 00: 00 1015 Michael - 0.399453 0.095427
2000 - 01 - 01 00: 01: 00 969 Patricia 0.650773 - 0.874275
2000 - 01 - 01 00: 02: 00 1016 Victor - 0.721465 - 0.584710
2000 - 01 - 01 00: 03: 00 939 Alice - 0.746004 - 0.908008
2000 - 01 - 01 00: 04: 00 1017 Dan 0.919451 - 0.803504
   ...............
   2000 - 12 - 30 23: 56: 00 999 Tim 0.162578 0.512817
2000 - 12 - 30 23: 57: 00 970 Laura - 0.433586 - 0.600289
2000 - 12 - 30 23: 58: 00 1065 Edith 0.232211 - 0.454540
2000 - 12 - 30 23: 59: 00 1019 Ingrid 0.322208 - 0.615974
2000 - 12 - 31 00: 00: 00 937 Ursula - 0.906523 0.943178

[525601 rows x 4 columns]
In[5]: pd.read_parquet("timeseries_wide.parquet", columns = columns)
Out[5]:
   id_0 name_0 x_0 y_0
timestamp
2000 - 01 - 01 00: 00: 00 1015 Michael - 0.399453 0.095427
2000 - 01 - 01 00: 01: 00 969 Patricia 0.650773 - 0.874275
2000 - 01 - 01 00: 02: 00 1016 Victor - 0.721465 - 0.584710
2000 - 01 - 01 00: 03: 00 939 Alice - 0.746004 - 0.908008
2000 - 01 - 01 00: 04: 00 1017 Dan 0.919451 - 0.803504
   ...............
   2000 - 12 - 30 23: 56: 00 999 Tim 0.162578 0.512817
2000 - 12 - 30 23: 57: 00 970 Laura - 0.433586 - 0.600289
2000 - 12 - 30 23: 58: 00 1065 Edith 0.232211 - 0.454540
2000 - 12 - 30 23: 59: 00 1019 Ingrid 0.322208 - 0.615974
2000 - 12 - 31 00: 00: 00 937 Ursula - 0.906523 0.943178

[525601 rows x 4 columns]
In[6]: ts = pd.read_parquet("timeseries.parquet")

In[7]: ts
Out[7]:
   id name x y
timestamp
2000 - 01 - 01 00: 00: 00 1029 Michael 0.278837 0.247932
2000 - 01 - 01 00: 00: 30 1010 Patricia 0.077144 0.490260
2000 - 01 - 01 00: 01: 00 1001 Victor 0.214525 0.258635
2000 - 01 - 01 00: 01: 30 1018 Alice - 0.646866 0.822104
2000 - 01 - 01 00: 02: 00 991 Dan 0.902389 0.466665
   ...............
   2000 - 12 - 30 23: 58: 00 992 Sarah 0.721155 0.944118
2000 - 12 - 30 23: 58: 30 1007 Ursula 0.409277 0.133227
2000 - 12 - 30 23: 59: 00 1009 Hannah - 0.452802 0.184318
2000 - 12 - 30 23: 59: 30 978 Kevin - 0.904728 - 0.179146
2000 - 12 - 31 00: 00: 00 973 Ingrid - 0.370763 - 0.794667

[1051201 rows x 4 columns]
In[8]: ts.dtypes
Out[8]:
   id int64
name object
x float64
y float64
dtype: object