slicing a dask dataframe

  • Last Update :
  • Techknowledgy :

Slicing , Scheduling ,Slicing by integers and slices: x[0, :5], Ordering

>>> # Trillion element array of ones, in 1000 by 1000 blocks >>>
   x = da.ones((1000000, 1000000), chunks = (1000, 1000))

   >>>
   da.exp(x)[: 1500,: 1500]
   ...
# Array of ones, chunked along axis 0 >>>
   a = da.ones((4, 10000, 10000), chunks = (1, -1, -1))
>>> a[[0, 1], :, :]
dask.array<getitem, shape=(2, 10000, 10000), dtype=float64, chunksize=(1, 10000, 10000), chunktype=numpy.ndarray>
>>> a[[0] * 15, :, :]
PerformanceWarning: Slicing is producing a large chunk. To accept the large
chunk and silence this warning, set the option
    >>> with dask.config.set({'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set({'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
dask.array<getitem, shape=(15, 10000, 10000), dtype=float64, chunksize=(15, 10000, 10000), chunktype=numpy.ndarray>

Suggestion : 2

However in your case of train/test splitting you will probably be better served by the random_split method.

train, test = df.random_split([0.80, 0.20])

You could also make many splits and concat in different ways

splits = df.random_split([0.20, 0.20, 0.20, 0.20, 0.20])

for i in range(5):
   trains = [splits[j]
      for j in range(5) if j != i
   ]
train = dd.concat(trains, axis = 0)
test = splits[i]

Suggestion : 3

I’ve got a large Zarr array stored in google cloud, along with a parquet file with metadata that I can load as a dask dataframe.,What I’d like is to efficiently slice out a piece of that array using some logical operations that use the dataframe–e.g. yielding a Series of boolean values of which rows I need. An example snippet would look like this:,So if 90% of the values in idx end up being False, it might actually be worth computing it up front, because then my_dask_arr[idx_arr] will let you skip loading 90% of the Zarr data. Whereas if idx_arr is delayed, you’ll always load all the Zarr data, even if you throw 90% of it away immediately.,Loading data happens before slicing. It is currently impossible, given the very basic definition of how dask works (graphs are statically defined before computation; each chunk is 1 key in the graph), to have an optimization that would rewrite that graph to something like:

What I’d like is to efficiently slice out a piece of that array using some logical operations that use the dataframe–e.g. yielding a Series of boolean values of which rows I need. An example snippet would look like this:

my_dask_df = dd.from_parquet("gs://...")
my_dask_arr = da.from_zarr("gs://...")

some_data = my_dask_arr[my_dask_df["label"].isin(some_labels),: ].compute()

However, it seems in this case that you do know something about the length of the DataFrame, since you made it yourself. In particular, if you know the length of each of the Parquet partitions, then you could do:

idx = my_dask_df["label"].isin(some_labels)
partition_lengths = [PARTITION_LENGTH] * rows.npartitions
# or maybe partition_lengths = [100, 100, 250, 90, ...]
idx_arr = rows.to_dask_array(lengths = partition_lengths).squeeze()
some_data = my_dask_arr[idx_arr]

Sadly, this is not how Dask works. Simplified, the graph of tasks looks like:

Load parquet
   |
   v Load zarr
get matching rows | \ |
   v v
slice rows from array

Loading data happens before slicing. It is currently impossible, given the very basic definition of how dask works (graphs are statically defined before computation; each chunk is 1 key in the graph), to have an optimization that would rewrite that graph to something like:

Load parquet
   |
   v
get matching rows
   |
   v
decide which zarr chunks to load
   |
   v
load zarr
   |
   v
slice rows from array

This is exactly what you’d be doing if you did:

idx = my_dask_df["label"].isin(some_labels)
idx_arr = idx.values.squeeze().compute()
some_data = my_dask_arr[idx_arr]

It sounds like you’re worried that if you do:

idx = my_dask_df["label"].isin(some_labels)
idx.compute()

Instead, what if you do (edit: fixed):

matches = my_dask_df["label"].isin(some_labels).reset_index(drop = True)
idx = matches[matches].index
# or, using `query`:
   idx = my_dask_df.reset_index().query(f "label in {some_labels!r}").index

idx.compute()