Python関数で処理#
Polarsでは、ユーザー関数を呼び出してデータの変換や処理を行うことができます。ユーザー関数を使用することで、既存の関数や演算子だけでは実現できない特定の処理を追加することが可能です。本章では、polarsでユーザー関数をどのように定義し、呼び出すかについて詳しく解説します。ユーザー関数を利用することで、より柔軟で効率的なデータ処理を実現することができます。
import polars as pl
import numpy as np
from helper.jupyter import row
pipe#
Polars では、コードの可読性を向上させたり、関数を柔軟に適用したりするために、pipe
メソッドを利用できます。このメソッドは、主に以下の2つの場面で使用されます。
DataFrame.pipe()
Expr.pipe()
それぞれの使い方を具体例を交えて解説します。
DataFrame.pipe#
DataFrame.pipe
メソッドは、関数をチェーン処理の中で適用するための便利なツールです。これにより、複雑なデータ処理を段階的に記述でき、コードの可読性と保守性を向上させることができます。
pipe()
の最初の引数には、処理を実行する関数を渡します。この関数の最初の引数として、DataFrame
自体が自動的に渡されます。また、pipe()
のその他の位置引数やキーワード引数は、そのまま渡された関数に引き継がれます。
df = pl.DataFrame({
"A": [1, 2, 3],
"B": [4, 5, 6],
"C": [7, 8, 9]
})
def add_column(df, col_name, value):
return df.with_columns(pl.lit(value).alias(col_name))
def multiply_column(df, col_name, factor):
return df.with_columns((pl.col(col_name) * factor).alias(col_name))
df_res = (
df.pipe(add_column, col_name="D", value=10)
.pipe(multiply_column, col_name="A", factor=2)
)
row(df, df_res)
shape: (3, 3)
|
shape: (3, 4)
|
Expr.pipe#
演算式の pipe
メソッドを使用することで、演算式に対して柔軟なデータ変換やカスタムロジックを適用できます。以下は、2次式を計算する関数 quadratic()
を定義し、pipe
メソッドを利用して複数の演算式に処理を適用する例です。
pipe()
の最初の引数には、Expr
を処理する関数を指定します。この関数の最初の引数には、演算式が自動的に渡されます。また、pipe()
のその他の位置引数やキーワード引数は、そのまま指定した関数に引き継がれます。
次のコードでは、.pipe(quadratic, ...)
を使って次のような計算を行います。
A
列 に対して \( x^2 + 2 \cdot x + 3 \) を計算し、列名に"_2"
を追加します。B
列 とC
列 の合計に対して \( x^2 + 2 \cdot x \) を計算し、列名に"_q"
を追加します。
def quadratic(x, a=0, b=0, c=0, suffix='_2'):
return (a * x ** 2 + b * x + c).name.suffix(suffix)
df_res = (
df.with_columns(
pl.col("A").pipe(quadratic, 1, 2, 3),
(pl.col("B") + pl.col("C")).pipe(quadratic, a=1, b=2, suffix='_q')
)
)
row(df, df_res)
shape: (3, 3)
|
shape: (3, 5)
|
map_batches#
pl.map_batches
は、DataFrameの複数の列を一括で処理する際に便利な関数です。この関数を使うと、指定した列をユーザー定義関数に渡してカスタムの計算を実行し、その結果を新しい列として追加することができます。
pl.map_batches(column_names, f: Callable[[list[pl.Series]], pl.Series | Any])
column_names
:ユーザー関数に渡す列名或いは演算式のリスト。f
: データを処理するためのユーザー定義関数。この関数は、指定された列のpl.Series
リストを受け取り、新しいpl.Series
または他の値を返します。
次の例では、a
とb
の列を使ってユーザー定義関数hypot
を適用し、その結果を新しい列c
としてDataFrameに追加します。
df = pl.DataFrame(
{
"a": [3, 3, 3, 4],
"b": [4, 12, 6, 7],
"g": ['A', 'B', 'A', 'B']
}
)
def hypot(args:list[pl.Series]):
a, b = args
return (a**2 + b**2)**0.5
df.with_columns(
pl.map_batches(['a', 'b'], hypot).alias('c')
)
a | b | g | c |
---|---|---|---|
i64 | i64 | str | f64 |
3 | 4 | "A" | 5.0 |
3 | 12 | "B" | 12.369317 |
3 | 6 | "A" | 6.708204 |
4 | 7 | "B" | 8.062258 |
column_names
引数には演算式を使用することができます。下のコードでは、a
列の値に1を加えた結果とb
列の値を使って計算します。
df.with_columns(
pl.map_batches([pl.col('a') + 1, 'b'], hypot).alias('c')
)
a | b | g | c |
---|---|---|---|
i64 | i64 | str | f64 |
3 | 4 | "A" | 5.656854 |
3 | 12 | "B" | 12.649111 |
3 | 6 | "A" | 7.211103 |
4 | 7 | "B" | 8.602325 |
演算式には、map_batches()
というメソッドもあります。以下はExpr.map_batches()
を使って演算式の計算結果をユーザー関数で処理する例です。2列のデータは2回に分かり、square()
関数に渡します。
def square(s):
return s**2
df.select(
pl.col('a', 'b').map_batches(square)
)
a | b |
---|---|
i64 | i64 |
9 | 16 |
9 | 144 |
9 | 36 |
16 | 49 |
pl.struct()
を使って複数の列を一つの構造体列に変換し、その後でmap_batches()
を使ってカスタム関数を適用することができます。以下は、複数の列を処理するためのサンプルコードです。
def square2(s):
return s.struct.field('a')**2 + s.struct.field('b')**2
df.select(
pl.struct('a', 'b').map_batches(square2)
)
a |
---|
i64 |
25 |
153 |
45 |
65 |
GroupBy.agg()
のコンテキストで map_batches()
を使用する場合、次の2つの引数がユーザー関数の入出力に影響します。
returns_scalar
(デフォルト:False
)True
の場合、ユーザー関数の戻り値は 1 つのスカラー値になります。False
の場合、スカラー値はリストに変換されます。
agg_list
(デフォルト:False
)True
の場合、各グループの値をリスト型のSeries
オブジェクトとしてユーザー関数に渡します。False
の場合、各グループの値をSeries
として渡し、ユーザー関数はグループごとに複数回実行されます。
次のプログラムは、returns_scalar
引数の影響を比較します。デフォルト値 False
の場合、結果の列には 1 つの値が入ったリストが格納されます。
df = pl.DataFrame(
dict(
g=['A', 'B', 'C', 'A', 'B', 'B'],
x=[1, 2, 3, 4, 5, 6])
)
def func1(s):
return s.mean()
g = df.group_by('g', maintain_order=True)
row(
g.agg(pl.col('x').map_batches(func1)),
g.agg(pl.col('x').map_batches(func1, returns_scalar=True))
)
shape: (3, 2)
|
shape: (3, 2)
|
次のプログラムでは、ユーザー関数内で入力データを print
しています。結果から、ユーザー関数がグループごとに実行されていることがわかります。この場合、演算効率が低下する可能性があります。
def func2(s):
print(s)
return s.mean()
g.agg(pl.col('x').map_batches(func2, returns_scalar=True))
shape: (3,)
Series: '' [i64]
[
2
5
6
]
shape: (2,)
Series: '' [i64]
[
1
4
]
shape: (1,)
Series: '' [i64]
[
3
]
g | x |
---|---|
str | f64 |
"A" | 2.5 |
"B" | 4.333333 |
"C" | 3.0 |
agg_list
引数を True
に設定すると、各グループの値がリストとしてユーザー関数に渡されます。これにより、ユーザー関数は 1 回だけ実行されます。
def func3(s):
print(s)
return s.list.mean()
df.group_by('g', maintain_order=True).agg(pl.col('x').map_batches(func3, agg_list=True))
shape: (3,)
Series: 'x' [list[i64]]
[
[1, 4]
[2, 5, 6]
[3]
]
g | x |
---|---|
str | f64 |
"A" | 2.5 |
"B" | 4.333333 |
"C" | 3.0 |
map_groups#
pl.map_groups()
を使用して各グループのデータをカスタム関数で処理し、その結果を収集することができます。以下のプログラムは、グループごとに、a
列の最大値とb
の最大値の比を計算します。
def ratio_max(args):
a, b = args
return a.max() / b.max()
df.group_by('g').agg(
pl.map_groups(['a', 'b'], ratio_max)
)
g | a |
---|---|
str | f64 |
"B" | 0.333333 |
"A" | 0.5 |
map_elements#
Expr.map_elements()
を使用して演算式の各個値をカスタム関数に渡し、その結果を新しい列として計算することができます。以下の例では、f
関数を使って各値を処理する例です。
def f(x):
if x > 10:
return 'large'
elif x > 5:
return 'middle'
else:
return 'small'
df.with_columns(
pl.col.b.map_elements(f, return_dtype=pl.String).alias('b_category')
)
a | b | g | b_category |
---|---|---|---|
i64 | i64 | str | str |
3 | 4 | "A" | "small" |
3 | 12 | "B" | "large" |
3 | 6 | "A" | "middle" |
4 | 7 | "B" | "middle" |
Expr.map_elements()
は各値を逐一処理するため、データ量が多い場合は演算速度が遅くなる可能性があります。そのため、ベクトル演算やPolarsの組み込み関数で処理できない場合にのみ使用することが推奨されます。例えば、上記の例では、以下のようにベクトル化された条件分岐を使用することもできます:
df.with_columns(
pl.when(pl.col('b') > 10).then(pl.lit('large'))
.when(pl.col('b') > 5).then(pl.lit('middle'))
.otherwise(pl.lit('small'))
.alias('b_category')
)
a | b | g | b_category |
---|---|---|---|
i64 | i64 | str | str |
3 | 4 | "A" | "small" |
3 | 12 | "B" | "large" |
3 | 6 | "A" | "middle" |
4 | 7 | "B" | "middle" |
agg()
とover()
のコンテキストでmap_elements()
を使用する場合、ユーザー関数の引数には各グループに該当する値のSeries
オブジェクトが渡されます。次のコードでは、ユーザー関数f
と演算式を用いて同じ計算を実装します。
def f(s):
return ((s - s[0])**2).mean()
cols = pl.col('a', 'b')
df1 = (
df
.group_by('g', maintain_order=True)
.agg(cols.map_elements(f, return_dtype=pl.Float64))
)
df2 = (
df
.group_by('g')
.agg(((cols - cols.first())**2).mean())
)
row(df1, df2)
shape: (2, 3)
|
shape: (2, 3)
|
rolling_map#
pl.rolling_mean()
などの移動ウィンドウ処理関数で対応できない場合に、rolling_map()
を使用してウィンドウ内のデータをカスタムユーザー関数で処理できます。具体的には、以下のコードは、各ウィンドウ内のデータをユーザー関数に渡し、処理した結果を新しい列として追加します。
Tip
pl.Series._s
:Seriesオブジェクトの内部データにアクセスするために使用します。これにより、実際のデータやそのメモリのIDを確認できます。
arguments
リストに、各ウィンドウ内のデータとその Series オブジェクトのメモリIDが記録されます。効率的にメモリを利用するため、同じSeriesオブジェクトが複数回渡されていることが確認できます。
df = pl.DataFrame(
{
"a": [2, 4, 0, 3, 1, 6]
}
)
arguments = []
def f(s):
arguments.append((id(s._s), s.to_list()))
return s.mean()
df.select(
pl.col('a').rolling_map(f, window_size=3, center=True, min_periods=1)
)
for id_, arg in arguments:
print(id_, arg)
2294865850272 [2, 4]
2294865850272 [2, 4, 0]
2294865850272 [4, 0, 3]
2294865850272 [0, 3, 1]
2294865850272 [3, 1, 6]
2294865850272 [1, 6]
ufunc#
NumPy
の ufunc
関数を直接使用して列あるいは演算式に対して演算を行うことができます。例えば、np.hypot
は二つの値の平方根の和を計算する関数で、Polarsでは次のように使用することができます。
df = pl.DataFrame(
{
"a": [3, 3, 3, 4],
"b": [4, 12, 6, 7],
"g": ['A', 'B', 'A', 'B']
}
)
df.with_columns(
np.hypot(pl.col("a"), pl.col("b")).alias("c")
)
a | b | g | c |
---|---|---|---|
i64 | i64 | str | f64 |
3 | 4 | "A" | 5.0 |
3 | 12 | "B" | 12.369317 |
3 | 6 | "A" | 6.708204 |
4 | 7 | "B" | 8.062258 |
np.hypot()
を使って列を計算する際、Polarsの内部でどのように処理されるかを理解するために、次のように演算式を見てみます。この演算式はa
列を構造体列に変換し、b
列を”argument_1”として追加します。構造体列に対して、python_udf()
で処理します。この関数でNumPyのnp.hypot()
を呼び出します。
np.hypot(pl.col("a"), pl.col("b"))
列名とフィールド名変更#
Expr.name.map()
列名をユーザー関数で変更します。
Expr.name.map_fields()
Structのfield名をユーザー関数で変更します。
def rename(n):
return n.upper() + n
df.select(pl.all().name.map(rename))
Aa | Bb | Gg |
---|---|---|
i64 | i64 | str |
3 | 4 | "A" |
3 | 12 | "B" |
3 | 6 | "A" |
4 | 7 | "B" |
DataFrame.map_rows()
行をTupleとして、ユーザー関数に渡します。
def f(row):
a, b, g = row
return f"{g}:{a + b}", f"{g}:{a - b}"
df.map_rows(f).rename({"column_0": "add", "column_1": "sub"})
add | sub |
---|---|
str | str |
"A:7" | "A:-1" |
"B:15" | "B:-9" |
"A:9" | "A:-3" |
"B:11" | "B:-3" |
辞書で行を処理した場合は、DataFrame.iter_rows()
を使います。
for row in df.iter_rows(named=True):
print(row)
{'a': 3, 'b': 4, 'g': 'A'}
{'a': 3, 'b': 12, 'g': 'B'}
{'a': 3, 'b': 6, 'g': 'A'}
{'a': 4, 'b': 7, 'g': 'B'}