What you probaby wanted to do is using
multiprocessing.map() which cuts your iterable for you (so you don't need to do the
block_size thingies), plus it guarantees that the results are done in order. I've reworked your code to use multiprocessing map:
import multiprocessing from functools import partial import pprint def data_write_func(line): i = multiprocessing.current_process()._identity line = [i * 2 for i in line ] files[i - 1].write(",".join((str(s) for s in line)) + "\n") N = 4 mydata = [ [x + 1, x + 2, x + 3, x + 4] for x in range(0, 4000 * N, 4) ] # fake some data files = [open('data_part_' + str(i), 'w') for i in range(N)] pool = multiprocessing.Pool(processes = N) pool.map(data_write_func, mydata) pool.close() pool.join()
So if you do this in your code:
h5file = h5py.File(dataFile, 'w') dset = h5file.create_dataset('train', data = mydata)
The issue could not be replicated. Here is my full code:
#!/usr/bin/env python import multiprocessing N = 4 mydata = [ [x + 1, x + 2, x + 3, x + 4] for x in range(0, 4000 * N, 4) ] # fake some data def data_write_func(mydata, i, block_size = 1000): fout = open('data_part_' + str(i), 'w') data_part = mydata[block_size * i: block_size * i + block_size] for line in data_part: # do some processing, say * 2 for each element... line = [x * 2 for x in line ] # then write out.. fout.write(','.join(map(str, line)) + '\n') fout.close() pool = multiprocessing.Pool(processes = N) for i in range(2): pool.apply_async(data_write_func, (mydata, i, )) pool.close() pool.join()
Sample output from
2, 4, 6, 8 10, 12, 14, 16 18, 20, 22, 24 26, 28, 30, 32 34, 36, 38, 40 42, 44, 46, 48 50, 52, 54, 56 58, 60, 62, 64
Read-only parallel access to HDF5 files works with no special preparation: each process should open the file independently and read data normally (avoid opening the file and then forking).,These two classes are called collective and independent operations. Anything which modifies the structure or metadata of a file must be done collectively. For example, when creating a group, each process must participate:,The parallel features of HDF5 are mostly transparent. To open a file shared across multiple processes, use the mpio file driver. Here’s an example program which opens a file, creates a single dataset and fills it with the process ID:,The mpi4py package includes all kinds of mechanisms to share data between processes, synchronize, etc. It’s a different flavor of parallelism than, say, threads or multiprocessing, but easy to get used to.
>>> from mpi4py import MPI >>> print("Hello World (from process %d)" % MPI.COMM_WORLD.Get_rank())
$ mpiexec - n 4 python demo.py Hello World(from process 1) Hello World(from process 2) Hello World(from process 3) Hello World(from process 0)
$. / configure--enable - parallel--enable - shared
$ h5cc - showconfig
$ export CC = mpicc $ export HDF5_MPI = "ON" $ export HDF5_DIR = "/path/to/parallel/hdf5" # If this isn 't found by default $ pip install.
from mpi4py import MPI import h5py rank = MPI.COMM_WORLD.rank # The process ID(integer 0 - 3 for 4 - process run) f = h5py.File('parallel_test.hdf5', 'w', driver = 'mpio', comm = MPI.COMM_WORLD) dset = f.create_dataset('test', (4, ), dtype = 'i') dset[rank] = rank f.close()
Now we can see that saving data in HDF5 is easy, and we could use function create_dataset and create_group as shown in the quick start. But I am more prefer to use the above approach to create multiple intermediate groups implicitly as getting access to a folder structure.,After we installed h5py, you can follow the quick start guide in h5py documentation to get a quick start. But here, let’s use one example to show how do we create, and read a HDF5 file. Let’s import the NumPy and h5py first.,This notebook contains an excerpt from the Python Programming and Numerical Methods - A Guide for Engineers and Scientists, the content is also available at Berkeley Python Numerical Methods.,Now suppose you send the station.hdf5 to a colleague, who wants to get access to the data. Here is how he/she will do it.
import numpy as np import h5py
# Generate random data for recording acc_1 = np.random.random(1000) station_number_1 = '1' # unix timestamp start_time_1 = 1542000276 # time interval for recording dt_1 = 0.04 location_1 = 'Berkeley' acc_2 = np.random.random(500) station_number_2 = '2' start_time_2 = 1542000576 dt_2 = 0.01 location_2 = 'Oakland'
hf = h5py.File('station.hdf5', 'w')
hf['/acc/1/data'] = acc_1 hf['/acc/1/data'].attrs['dt'] = dt_1 hf['/acc/1/data'].attrs['start_time'] = start_time_1 hf['/acc/1/data'].attrs['location'] = location_1 hf['/acc/2/data'] = acc_2 hf['/acc/2/data'].attrs['dt'] = dt_2 hf['/acc/2/data'].attrs['start_time'] = start_time_2 hf['/acc/2/data'].attrs['location'] = location_2 hf['/gps/1/data'] = np.random.random(100) hf['/gps/1/data'].attrs['dt'] = 60 hf['/gps/1/data'].attrs['start_time'] = 1542000000 hf['/gps/1/data'].attrs['location'] = 'San Francisco'
hf_in = h5py.File('station.hdf5', 'r')