Python関数で処理#

Polarsでは、ユーザー関数を呼び出してデータの変換や処理を行うことができます。ユーザー関数を使用することで、既存の関数や演算子だけでは実現できない特定の処理を追加することが可能です。本章では、polarsでユーザー関数をどのように定義し、呼び出すかについて詳しく解説します。ユーザー関数を利用することで、より柔軟で効率的なデータ処理を実現することができます。

import polars as pl
import numpy as np
from helper.jupyter import row

pipe#

Polars では、コードの可読性を向上させたり、関数を柔軟に適用したりするために、pipe メソッドを利用できます。このメソッドは、主に以下の2つの場面で使用されます。

  1. DataFrame.pipe()

  2. Expr.pipe()

それぞれの使い方を具体例を交えて解説します。

DataFrame.pipe#

DataFrame.pipe メソッドは、関数をチェーン処理の中で適用するための便利なツールです。これにより、複雑なデータ処理を段階的に記述でき、コードの可読性と保守性を向上させることができます。

pipe() の最初の引数には、処理を実行する関数を渡します。この関数の最初の引数として、DataFrame 自体が自動的に渡されます。また、pipe() のその他の位置引数やキーワード引数は、そのまま渡された関数に引き継がれます。

df = pl.DataFrame({
    "A": [1, 2, 3],
    "B": [4, 5, 6],
    "C": [7, 8, 9]
})

def add_column(df, col_name, value):
    return df.with_columns(pl.lit(value).alias(col_name))

def multiply_column(df, col_name, factor):
    return df.with_columns((pl.col(col_name) * factor).alias(col_name))

df_res = (
    df.pipe(add_column, col_name="D", value=10)
      .pipe(multiply_column, col_name="A", factor=2)
)

row(df, df_res)
shape: (3, 3)
ABC
i64i64i64
147
258
369
shape: (3, 4)
ABCD
i64i64i64i32
24710
45810
66910

Expr.pipe#

演算式の pipe メソッドを使用することで、演算式に対して柔軟なデータ変換やカスタムロジックを適用できます。以下は、2次式を計算する関数 quadratic() を定義し、pipe メソッドを利用して複数の演算式に処理を適用する例です。

pipe() の最初の引数には、Expr を処理する関数を指定します。この関数の最初の引数には、演算式が自動的に渡されます。また、pipe() のその他の位置引数やキーワード引数は、そのまま指定した関数に引き継がれます。

次のコードでは、.pipe(quadratic, ...)を使って次のような計算を行います。

  • A列 に対して \( x^2 + 2 \cdot x + 3 \) を計算し、列名に "_2" を追加します。

  • B列 とC列 の合計に対して \( x^2 + 2 \cdot x \) を計算し、列名に "_q" を追加します。

def quadratic(x, a=0, b=0, c=0, suffix='_2'):
    return (a * x ** 2 + b * x + c).name.suffix(suffix)

df_res = (
    df.with_columns(
        pl.col("A").pipe(quadratic, 1, 2, 3),
        (pl.col("B") + pl.col("C")).pipe(quadratic, a=1, b=2, suffix='_q')
    )
)

row(df, df_res)
shape: (3, 3)
ABC
i64i64i64
147
258
369
shape: (3, 5)
ABCA_2B_q
i64i64i64i64i64
1476143
25811195
36918255

map_batches#

pl.map_batchesは、DataFrameの複数の列を一括で処理する際に便利な関数です。この関数を使うと、指定した列をユーザー定義関数に渡してカスタムの計算を実行し、その結果を新しい列として追加することができます。

pl.map_batches(column_names, f: Callable[[list[pl.Series]], pl.Series | Any])
  • column_names:ユーザー関数に渡す列名或いは演算式のリスト。

  • f: データを処理するためのユーザー定義関数。この関数は、指定された列のpl.Seriesリストを受け取り、新しいpl.Seriesまたは他の値を返します。

次の例では、abの列を使ってユーザー定義関数hypotを適用し、その結果を新しい列cとしてDataFrameに追加します。

df = pl.DataFrame(
    {
        "a": [3, 3, 3, 4],
        "b": [4, 12, 6, 7],
        "g": ['A', 'B', 'A', 'B']
    }
)

def hypot(args:list[pl.Series]):
    a, b = args
    return (a**2 + b**2)**0.5
    
df.with_columns(
    pl.map_batches(['a', 'b'], hypot).alias('c')
)
shape: (4, 4)
abgc
i64i64strf64
34"A"5.0
312"B"12.369317
36"A"6.708204
47"B"8.062258

column_names引数には演算式を使用することができます。下のコードでは、a列の値に1を加えた結果とb列の値を使って計算します。

df.with_columns(
    pl.map_batches([pl.col('a') + 1, 'b'], hypot).alias('c')

)
shape: (4, 4)
abgc
i64i64strf64
34"A"5.656854
312"B"12.649111
36"A"7.211103
47"B"8.602325

演算式には、map_batches()というメソッドもあります。以下はExpr.map_batches()を使って演算式の計算結果をユーザー関数で処理する例です。2列のデータは2回に分かり、square()関数に渡します。

def square(s):
    return s**2

df.select(
    pl.col('a', 'b').map_batches(square)
)
shape: (4, 2)
ab
i64i64
916
9144
936
1649

pl.struct()を使って複数の列を一つの構造体列に変換し、その後でmap_batches()を使ってカスタム関数を適用することができます。以下は、複数の列を処理するためのサンプルコードです。

def square2(s):
    return s.struct.field('a')**2 + s.struct.field('b')**2

df.select(
    pl.struct('a', 'b').map_batches(square2)
)
shape: (4, 1)
a
i64
25
153
45
65

GroupBy.agg() のコンテキストで map_batches() を使用する場合、次の2つの引数がユーザー関数の入出力に影響します。

  • returns_scalar(デフォルト: False

    • True の場合、ユーザー関数の戻り値は 1 つのスカラー値になります。

    • False の場合、スカラー値はリストに変換されます。

  • agg_list(デフォルト: False

    • True の場合、各グループの値をリスト型の Series オブジェクトとしてユーザー関数に渡します。

    • False の場合、各グループの値を Series として渡し、ユーザー関数はグループごとに複数回実行されます。

次のプログラムは、returns_scalar 引数の影響を比較します。デフォルト値 False の場合、結果の列には 1 つの値が入ったリストが格納されます。

df = pl.DataFrame(
    dict(
        g=['A', 'B', 'C', 'A', 'B', 'B'], 
        x=[1, 2, 3, 4, 5, 6])
)

def func1(s):
    return s.mean()

g = df.group_by('g', maintain_order=True)

row(
    g.agg(pl.col('x').map_batches(func1)),
    g.agg(pl.col('x').map_batches(func1, returns_scalar=True))
)
shape: (3, 2)
gx
strlist[f64]
"A"[2.5]
"B"[4.333333]
"C"[3.0]
shape: (3, 2)
gx
strf64
"A"2.5
"B"4.333333
"C"3.0

次のプログラムでは、ユーザー関数内で入力データを print しています。結果から、ユーザー関数がグループごとに実行されていることがわかります。この場合、演算効率が低下する可能性があります。

def func2(s):
    print(s)
    return s.mean()

g.agg(pl.col('x').map_batches(func2, returns_scalar=True))
shape: (3,)
Series: '' [i64]
[
	2
	5
	6
]
shape: (2,)
Series: '' [i64]
[
	1
	4
]
shape: (1,)
Series: '' [i64]
[
	3
]
shape: (3, 2)
gx
strf64
"A"2.5
"B"4.333333
"C"3.0

agg_list 引数を True に設定すると、各グループの値がリストとしてユーザー関数に渡されます。これにより、ユーザー関数は 1 回だけ実行されます。

def func3(s):
    print(s)
    return s.list.mean()

df.group_by('g', maintain_order=True).agg(pl.col('x').map_batches(func3, agg_list=True))
shape: (3,)
Series: 'x' [list[i64]]
[
	[1, 4]
	[2, 5, 6]
	[3]
]
shape: (3, 2)
gx
strf64
"A"2.5
"B"4.333333
"C"3.0

map_groups#

pl.map_groups() を使用して各グループのデータをカスタム関数で処理し、その結果を収集することができます。以下のプログラムは、グループごとに、a列の最大値とbの最大値の比を計算します。

def ratio_max(args):
    a, b = args
    return a.max() / b.max()

df.group_by('g').agg(
    pl.map_groups(['a', 'b'], ratio_max)
)
shape: (2, 2)
ga
strf64
"B"0.333333
"A"0.5

map_elements#

Expr.map_elements()を使用して演算式の各個値をカスタム関数に渡し、その結果を新しい列として計算することができます。以下の例では、f 関数を使って各値を処理する例です。

def f(x):
    if x > 10:
        return 'large'
    elif x > 5:
        return 'middle'
    else:
        return 'small'

df.with_columns(
    pl.col.b.map_elements(f, return_dtype=pl.String).alias('b_category')
)
shape: (4, 4)
abgb_category
i64i64strstr
34"A""small"
312"B""large"
36"A""middle"
47"B""middle"

Expr.map_elements()は各値を逐一処理するため、データ量が多い場合は演算速度が遅くなる可能性があります。そのため、ベクトル演算やPolarsの組み込み関数で処理できない場合にのみ使用することが推奨されます。例えば、上記の例では、以下のようにベクトル化された条件分岐を使用することもできます:

df.with_columns(
    pl.when(pl.col('b') > 10).then(pl.lit('large'))
      .when(pl.col('b') > 5).then(pl.lit('middle'))
      .otherwise(pl.lit('small'))
      .alias('b_category')
)
shape: (4, 4)
abgb_category
i64i64strstr
34"A""small"
312"B""large"
36"A""middle"
47"B""middle"

agg()over()のコンテキストでmap_elements()を使用する場合、ユーザー関数の引数には各グループに該当する値のSeriesオブジェクトが渡されます。次のコードでは、ユーザー関数fと演算式を用いて同じ計算を実装します。

def f(s):
    return ((s - s[0])**2).mean()

cols = pl.col('a', 'b')
    
df1 = (
df
.group_by('g', maintain_order=True)
.agg(cols.map_elements(f, return_dtype=pl.Float64))
)

df2 = (
df
.group_by('g')
.agg(((cols - cols.first())**2).mean())
)

row(df1, df2)
shape: (2, 3)
gab
strf64f64
"A"0.02.0
"B"0.512.5
shape: (2, 3)
gab
strf64f64
"B"0.512.5
"A"0.02.0

rolling_map#

pl.rolling_mean()などの移動ウィンドウ処理関数で対応できない場合に、rolling_map()を使用してウィンドウ内のデータをカスタムユーザー関数で処理できます。具体的には、以下のコードは、各ウィンドウ内のデータをユーザー関数に渡し、処理した結果を新しい列として追加します。

Tip

pl.Series._s:Seriesオブジェクトの内部データにアクセスするために使用します。これにより、実際のデータやそのメモリのIDを確認できます。

argumentsリストに、各ウィンドウ内のデータとその Series オブジェクトのメモリIDが記録されます。効率的にメモリを利用するため、同じSeriesオブジェクトが複数回渡されていることが確認できます。

df = pl.DataFrame(
    {
        "a": [2, 4, 0, 3, 1, 6]
    }
)

arguments = []
def f(s):
    arguments.append((id(s._s), s.to_list()))
    return s.mean()

df.select(
    pl.col('a').rolling_map(f, window_size=3, center=True, min_periods=1)
)

for id_, arg in arguments:
    print(id_, arg)
2294865850272 [2, 4]
2294865850272 [2, 4, 0]
2294865850272 [4, 0, 3]
2294865850272 [0, 3, 1]
2294865850272 [3, 1, 6]
2294865850272 [1, 6]

ufunc#

NumPyufunc関数を直接使用して列あるいは演算式に対して演算を行うことができます。例えば、np.hypot は二つの値の平方根の和を計算する関数で、Polarsでは次のように使用することができます。

df = pl.DataFrame(
    {
        "a": [3, 3, 3, 4],
        "b": [4, 12, 6, 7],
        "g": ['A', 'B', 'A', 'B']
    }
)

df.with_columns(
    np.hypot(pl.col("a"), pl.col("b")).alias("c")
)
shape: (4, 4)
abgc
i64i64strf64
34"A"5.0
312"B"12.369317
36"A"6.708204
47"B"8.062258

np.hypot()を使って列を計算する際、Polarsの内部でどのように処理されるかを理解するために、次のように演算式を見てみます。この演算式はa列を構造体列に変換し、b列を”argument_1”として追加します。構造体列に対して、python_udf()で処理します。この関数でNumPyのnp.hypot()を呼び出します。

np.hypot(pl.col("a"), pl.col("b"))
col("a").as_struct([col("b").alias("argument_1")]).python_udf()

列名とフィールド名変更#

Expr.name.map()

列名をユーザー関数で変更します。

Expr.name.map_fields()

Structのfield名をユーザー関数で変更します。

def rename(n):
    return n.upper() + n
    
df.select(pl.all().name.map(rename))
shape: (4, 3)
AaBbGg
i64i64str
34"A"
312"B"
36"A"
47"B"

DataFrame.map_rows()

行をTupleとして、ユーザー関数に渡します。

def f(row):
    a, b, g = row
    return f"{g}:{a + b}", f"{g}:{a - b}"
    
df.map_rows(f).rename({"column_0": "add", "column_1": "sub"})
shape: (4, 2)
addsub
strstr
"A:7""A:-1"
"B:15""B:-9"
"A:9""A:-3"
"B:11""B:-3"

辞書で行を処理した場合は、DataFrame.iter_rows()を使います。

for row in df.iter_rows(named=True):
    print(row)
{'a': 3, 'b': 4, 'g': 'A'}
{'a': 3, 'b': 12, 'g': 'B'}
{'a': 3, 'b': 6, 'g': 'A'}
{'a': 4, 'b': 7, 'g': 'B'}