what is numpy.core._multiarray_umath.implement_array_function and why it costs lots of time?

  • Last Update :
  • Techknowledgy :

You can see additional details in NEP 18, and you can check the function's docstring with help(numpy.core._multiarray_umath.implement_array_function):

Help on built - in
function implement_array_function in module numpy.core._multiarray_umath:

   implement_array_function(...)
Implement a
function with checks
for __array_function__ overrides.

All arguments are required, and can only be passed by position.

Arguments
-- -- -- -- -
implementation: function
Function that implements the operation on NumPy array without
overrides when called like ``
implementation( * args, ** kwargs)``.
public_api: function
Function exposed by NumPy 's public API originally called like
``
public_api( * args, ** kwargs)``
on which arguments are now being
checked.
relevant_args: iterable
Iterable of arguments to check
for __array_function__ methods.
args: tuple
Arbitrary positional arguments originally passed into ``
public_api``.
kwargs: dict
Arbitrary keyword arguments originally passed into ``
public_api``.

Returns
-- -- -- -
Result from calling ``
implementation()``
or an ``
__array_function__``
method, as appropriate.

Raises
-- -- --
TypeError: if no implementation is found.

Suggestion : 2

我正在使用 numpy v1.18.2在一些模拟中,并使用内置函数,例如 np.unique , np.diff和 np.interp .我在标准对象上使用这些函数,即列表或 numpy 数组。当我查看 cProfile 时,我看到这些函数调用了一个内置方法numpy.core._multiarray_umath.implement_array_function并且这种方法占32.5%我的运行时!据我了解,这是一个包装器,它执行一些检查以确保传递给函数的参数与函数兼容。我有两个问题:,Is this function (implement_array_function) actually taking up so much time or is it actually the operations I'm doing (np.unique, np.diff, np.interp) that is actually taking up all this time?,这个函数( implement_array_function )实际上占用了这么多时间,还是我正在做的操作( np.unique 、 np.diff 、 np.interp )实际上占用了所有时间?也就是说,我是否误解了 cProfile 输出?我对snakeviz的分层输出感到困惑。请查看 snakeviz output here和函数的详细信息 here .

import os
os.environ['NUMPY_EXPERIMENTAL_ARRAY_FUNCTION'] = '0'
import numpy as np

Suggestion : 3

I’m experimenting with a new shuffle algorithm for Dask dataframe. This is what backs distributed versions of join, set_index, groupby-apply, or anything that requires the large movement of rows around a distributed dataframe.,otherwise implementing this for the general case won’t be difficult, but will require accessing pandas internals in a way that id prefer to avoid having downstream packages do (#40226). will need to give some thought to where to implement this,After the frames are unpickled, the concat code spends a bunch of time figuring out that they are already aligned (matching columns and block structure), which in this case we know ex-ante. I’ll have to give some thought to how to avoid this.,After the frames are unpickled, the concat code spends a bunch of time figuring out that they are already aligned (matching columns and block structure), which in this case we know ex-ante. I’ll have to give some thought to how to avoid this.

We’ve constructed a script (thanks @gjoseph92 for starting this) that creates a random dataframe and a column on which to split, rearrange, serialize/deserialize, and concat a couple of times. This is representative of the operations that we’re trying to do, except that in between a couple of steps there the shards/groups end up coming from different machines, rather than being the same shards

import time
import random
import pickle

import numpy as np
import pandas as pd

# Parameters
n_groups = 10_000
n_cols = 1000
n_rows = 30_000

# Make input data
df = pd.DataFrame(np.random.random((n_rows, n_cols)))
df["partitions"] = (df[0] * n_groups).astype(int) # random values 0. .10000

start = time.time()
_, groups = zip( * df.groupby("partitions")) # split into many small shards

groups = list(groups)
random.shuffle(groups) # rearrange those shards

groups = [pickle.dumps(group) for group in groups] # Simulate sending across the network
groups = [pickle.loads(group) for group in groups]

df = pd.concat(groups) # reassemble shards
_, groups = zip( * df.groupby("partitions")) # and resplit

stop = time.time()

import dask
print(dask.utils.format_bytes(df.memory_usage().sum() / (stop - start)), "/s")

Below I’m pasting an implementation of what I have in mind. Its about 6x faster than pd.concat on the example in the OP. I’m going to ask you to take the baton to a) test it against a wide variety of DataFrames and b) profile it

import numpy as np
import pandas as pd
import pandas._testing as tm

def concat_known_aligned(frames: list[pd.DataFrame]):
   ""
"
pd.concat(frames, axis = 0) specialized to the
case
where we know that

a) Columns are identical across frames.
b) Underlying block layout is identical across frames.

i.e.these frames are generated by something like

frames = [df.iloc[i: i + 100]
   for i in range(0, len(df), 100)
]

Notes
-- -- -
The caller is responsible
for checking these conditions.
""
"
if len(frames) == 0:
   raise ValueError("frames must be non-empty.")

if frames[0].shape[1] == 0:
   # no columns, can use non - optimized concat cheaply
return pd.concat(frames, axis = 0, ignore_index = True)

mgrs = [df._mgr
   for df in frames
]
first = mgrs[0]

nbs = []
for i, blk in enumerate(first.blocks):
   arr = blk.values
arrays = [mgr.blocks[i].values
   for mgr in mgrs
]

if arr.ndim == 1:
   # i.e.is_1d_only_ea_dtype
new_arr = arr._concat_same_type(arrays)

elif not isinstance(arr, np.ndarray):
   new_arr = arr._concat_same_type(arrays, axis = 1)

else:
   new_arr = np.concatenate(arrays, axis = 1)

nb = type(blk)(new_arr, placement = blk.mgr_locs, ndim = 2)
nbs.append(nb)

index = frames[0].index.append([x.index
   for x in frames[1: ]
])
axes = [frames[0].columns, index]
new_mgr = type(first)(nbs, axes)
return pd.DataFrame(new_mgr)

def check_equivalent(frames):
   result = concat_known_aligned(frames)
expected = pd.concat(frames, axis = 0)
tm.assert_frame_equal(result, expected)

def test():
   df = tm.makeMixedDataFrame()
frames = [df[i: i + 1]
   for i in range(len(df))
]
check_equivalent(frames)

It doesn’t look like the np.concatenate C implementation releases the GIL anywhere. FWIW for fun I tried:

In[3]: @numba.jit(nopython = True, nogil = True)
   ...: def concat(arrs):
   ...: return np.concatenate(arrs)

but the parallel performance was about the same:

In[14]: % ptime - n4 np.concatenate(shards)
Total serial time: 3.19 s
Total parallel time: 1.26 s
For a 2.53 X speedup across 4 threads

In[15]: % ptime - n4 concat(shards)
Total serial time: 2.86 s
Total parallel time: 1.24 s
For a 2.30 X speedup across 4 threads

Unfortunately plain np.concatenate doesn’t parallelize well to begin with. If NumPy is releasing and re-acquiring the GIL multiple times during concatenation, I don’t know if we can expect pandas to do any better:

In[1]: import numpy as np
In[2]: % load_ext ptime
In[3]: shards = [np.random.random(500_000) for _ in range(400)]
In[4]: % ptime - n4 np.concatenate(shards)
Total serial time: 3.30 s
Total parallel time: 1.90 s
For a 1.74 X speedup across 4 threads

Testing locally on my mac, with DataFrames of around the same size and number as we were using on this cluster, it looks like your new method is a 1.4x speedup over pd.concat single-threaded, and a 1.8x speedup over pd.concat multi-threaded. This is great—getting nearly 2x faster will help a lot! However, concat_known_aligned is still very GIL-bound:

Shard size: 512.00 kiB, num shards to concatenate: 400
Serial 8 x - pd.concat, 1.9 s, 3.28 GiB / s
4 threads 8 x - pd.concat, 1.4 s, 4.31 GiB / s, 1.31 x parallel speedup(ideal: 4 x)
Serial 8 x - concat_known_aligned, 1.3 s, 4.64 GiB / s, 1.41 x speedup over pd.concat serial
4 threads 8 x - concat_known_aligned, 0.77 s, 8.02 GiB / s, 1.73 x parallel speedup(ideal: 4 x), 1.86 x speedup over pd.concat parallel

Suggestion : 4

I suspect a lot of time is spent on the numba JIT compilation of the calls into datashader but can’t be entirely sure. You could run a profiler in a notebook to see what takes so long. In a notebook you could do:,Is there any kind of documentation on how to optimize the speed of Panel applications?,There may be some way to cache the compiled numba code (see Notes on Caching — Numba 0.50.1 documentation). Alternatively I can only suggest rendering the app without that component, adding a loading indicator and then using pn.state.onload to schedule a callback which renders the map once the page itself has rendered.,Okay, a lot of it is indeed in the numba JIT compiler, which means that you should only incur that cost the first time the plots are rendered, subsequent renders should not incur the same cost. Otherwise I don’t really see any targets for you you to optimize but some possible candidates for internal optimizations in HoloViews and Panel.

My first Panel/Holoviews/Datashader app is almost ready to deploy. Except, that it takes about 8s for the site to load. Chrome showed it is a 6.9kB file, so it was transferred in 3ms, but indeed it was waiting for the server for 7.7s. So I jumped into the code and quickly printed out timelogs. Turns out the whole code is executed in about a half second, and then the final servable() takes about 7s. As a next step I started commenting out bits and pieces and I found a block that is responsible for about 6.8s, namely this:

combined = (
   hv.DynamicMap(get_tiles)

   *
   hd.regrid(hv.DynamicMap(get_image_01)).apply.opts(cmap = pn_01_cmap, alpha = pn_01_alpha) *
   hd.regrid(hv.DynamicMap(get_image_02)).apply.opts(cmap = pn_02_cmap, alpha = pn_02_alpha) *
   hd.regrid(hv.DynamicMap(get_image_03)).apply.opts(cmap = pn_03_cmap, alpha = pn_03_alpha) *
   hd.regrid(hv.DynamicMap(get_image_04)).apply.opts(cmap = pn_04_cmap, alpha = pn_04_alpha) *
   hd.regrid(hv.DynamicMap(get_image_05)).apply.opts(cmap = pn_05_cmap, alpha = pn_05_alpha) *
   hd.regrid(hv.DynamicMap(get_image_06)).apply.opts(cmap = pn_06_cmap, alpha = pn_06_alpha) *
   hd.regrid(hv.DynamicMap(get_image_07)).apply.opts(cmap = pn_07_cmap, alpha = pn_07_alpha) *
   hd.regrid(hv.DynamicMap(get_image_08)).apply.opts(cmap = pn_08_cmap, alpha = pn_08_alpha)

   *
   hd.regrid(hv.DynamicMap(get_white_bg_for_points)).options(cmap = ['white']).apply.opts(alpha = pn_white_bg_for_points_alpha) *
   points_aggregated_categorical *
   points_aggregated_for_hover *
   hv_points *
   hd.regrid(hv.DynamicMap(get_image_for_hover_tooltip))
)

I don’t expect anybody to be able to debug this without a working code, but maybe something obvious jumps out from this for somebody. Most layers above use a similar way to load the image based on a few widget values, like this:

@pn.depends(
   pn_layer_is_active = pn_is_active['01'],
   pn_resolution = pn_resolutions.param.value,
   pn_date = pn_dates.param.value_throttled,
   pn_01_type = pn_01_types.param.value,
)
def get_image_01(pn_layer_is_active, pn_resolution, pn_date, pn_01_type):
   ret = get_image_generic('01', pn_layer_is_active, pn_resolution, pn_date, pn_01_type)
return ret

def get_image_generic(category_name, pn_layer_is_active, pn_resolution, pn_date = None, subcategory_name = '-'):
   layer_is_active = int(pn_layer_is_active)
customn_js_hover_tool = get_export_hover_tool(category_name)

if layer_is_active:
   subcategory_name = subcategory_name.lower()
if pn_date:
   this_image = load_image(pn_resolution = pn_resolution, category_name = category_name, subcategory_name = subcategory_name, pn_date = pn_date)
else:
   this_image = load_image(pn_resolution = pn_resolution, category_name = category_name, subcategory_name = subcategory_name)

used_logz = source_data[category_name][subcategory_name]['logz']
if 'logz' in source_data[category_name][subcategory_name]
else False
used_min_val = source_data[category_name][subcategory_name]['min']
used_max_val = source_data[category_name][subcategory_name]['max']

this_image.opts(
   logz = used_logz,
   clim = (used_min_val, used_max_val),
   tools = [customn_js_hover_tool]
)
else:
   this_image = image_filled_with_nan.options(alpha = 0, colorbar = False, tools = [customn_js_hover_tool])
return this_image

def get_export_hover_tool(label):
   custom_tooltips = [
      ('Export Tooltip [' + str(label) + ']', '@image{custom}'),
   ]

code = ""
"
var label = "**label**";
if (typeof(window['saved_tooltips']) == "undefined") {
   window['saved_tooltips'] = {};
}
window['saved_tooltips'][label] = value;
return value;
""
"
code = code.replace('**label**', label)

custom_formatters = {
   '@image': bk.models.CustomJSHover(code = code),
}
this_hover_export = bk.models.HoverTool(tooltips = custom_tooltips, formatters = custom_formatters)
return this_hover_export

I suspect a lot of time is spent on the numba JIT compilation of the calls into datashader but can’t be entirely sure. You could run a profiler in a notebook to see what takes so long. In a notebook you could do:

% % prun
hv.render(combined)

Meanwile, how should I start with pn.state.onload? The only examples I found are these:

# https: //panel.holoviz.org/user_guide/Deploy_and_Export.html
   def app():
   widget = pn.widgets.Select()
def on_load():
   time.sleep(1) # Emulate some long running process
widget.options = ['A', 'B', 'C']
pn.state.onload(on_load)
return widget

# https: //panel.holoviz.org/gallery/simple/defer_data_load.html
   select_ticker = pn.widgets.Select(name = 'Stock Ticker')

def load_data():
   if 'stocks' not in pn.state.cache:
   pn.state.cache['stocks'] = df = pd.read_csv(
      stocks_url, parse_dates = ['date']).set_index('symbol')
else:
   df = pn.state.cache['stocks']
symbols = list(df.index.unique())
select_ticker.options = symbols
select_ticker.value = symbols[0]

pn.state.onload(load_data)

So I assume I misunderstood how on_load works and I would instead need something like this mockup code where #? and all caps shows fake code that does not exist in this form :

combined = (hv.DynamicMap(get_tiles) * hd.regrid(initial_content))

def load_data():
   IF 'intitial_content'
IN combined: # ?
   combined.REMOVE_LAYER('initial_content') # ??

   if 'image01' not in pn.state.cache:
   pn.state.cache['image01'] = get_image_01()
image01 = pn.state.cache['image01']
combined.ADD_LAYER(# ?? ?
   hd.regrid(hv.DynamicMap(image01)).apply.opts(
      cmap = pn_01_cmap, alpha = pn_01_alpha)
)

if 'image02'
not in pn.state.cache:
   pn.state.cache['image02'] = get_image_02()
image02 = pn.state.cache['image02']
combined.ADD_LAYER(# ?? ?
   hd.regrid(hv.DynamicMap(image02)).apply.opts(
      cmap = pn_02_cmap, alpha = pn_02_alpha)
)

pn.state.onload(load_data)
% % prun
hv.render(combined)