Python関数で処理#

Polarsでは、ユーザー関数を呼び出してデータの変換や処理を行うことができます。ユーザー関数を使用することで、既存の関数や演算子だけでは実現できない特定の処理を追加することが可能です。本章では、polarsでユーザー関数をどのように定義し、呼び出すかについて詳しく解説します。ユーザー関数を利用することで、より柔軟で効率的なデータ処理を実現することができます。

import polars as pl
import numpy as np
from helper.jupyter import row

pipe#

Polars では、コードの可読性を向上させたり、関数を柔軟に適用したりするために、pipe メソッドを利用できます。このメソッドは、主に以下の2つの場面で使用されます。

  1. DataFrame.pipe()

  2. Expr.pipe()

それぞれの使い方を具体例を交えて解説します。

DataFrame.pipe#

DataFrame.pipe メソッドは、関数をチェーン処理の中で適用するための便利なツールです。これにより、複雑なデータ処理を段階的に記述でき、コードの可読性と保守性を向上させることができます。

pipe() の最初の引数には、処理を実行する関数を渡します。この関数の最初の引数として、DataFrame 自体が自動的に渡されます。また、pipe() のその他の位置引数やキーワード引数は、そのまま渡された関数に引き継がれます。

df = pl.DataFrame({
    "A": [1, 2, 3],
    "B": [4, 5, 6],
    "C": [7, 8, 9]
})

def add_column(df, col_name, value):
    return df.with_columns(pl.lit(value).alias(col_name))

def multiply_column(df, col_name, factor):
    return df.with_columns((pl.col(col_name) * factor).alias(col_name))

df_res = (
    df.pipe(add_column, col_name="D", value=10)
      .pipe(multiply_column, col_name="A", factor=2)
)

row(df, df_res)
shape: (3, 3)
ABC
i64i64i64
147
258
369
shape: (3, 4)
ABCD
i64i64i64i32
24710
45810
66910

Expr.pipe#

演算式の pipe メソッドを使用することで、演算式に対して柔軟なデータ変換やカスタムロジックを適用できます。以下は、2次式を計算する関数 quadratic() を定義し、pipe メソッドを利用して複数の演算式に処理を適用する例です。

pipe() の最初の引数には、Expr を処理する関数を指定します。この関数の最初の引数には、演算式が自動的に渡されます。また、pipe() のその他の位置引数やキーワード引数は、そのまま指定した関数に引き継がれます。

次のコードでは、.pipe(quadratic, ...)を使って次のような計算を行います。

  • A列 に対して \( x^2 + 2 \cdot x + 3 \) を計算し、列名に "_2" を追加します。

  • B列 とC列 の合計に対して \( x^2 + 2 \cdot x \) を計算し、列名に "_q" を追加します。

def quadratic(x, a=0, b=0, c=0, suffix='_2'):
    return (a * x ** 2 + b * x + c).name.suffix(suffix)

df_res = (
    df.with_columns(
        pl.col("A").pipe(quadratic, 1, 2, 3),
        (pl.col("B") + pl.col("C")).pipe(quadratic, a=1, b=2, suffix='_q')
    )
)

row(df, df_res)
shape: (3, 3)
ABC
i64i64i64
147
258
369
shape: (3, 5)
ABCliteral_2literal_q
i64i64i64i64i64
1476143
25811195
36918255

map_batches#

pl.map_batches は、DataFrame の複数の列をまとめて処理する際に便利な関数です。この関数を使うと、指定した列をユーザー定義関数に渡してカスタム計算を行い、その結果を新しい列として追加できます。

pl.map_batches(
    column_names, 
    f: Callable[[Sequence[Series]], Series | Any], 
    return_dtype: PolarsDataType | pl.DataTypeExpr | None = None
)
  • column_names:ユーザー関数に渡す列名または式のリスト。

  • f:データを処理するユーザー定義関数。この関数は、指定した列の pl.Series のリストを受け取り、新しい pl.Series またはその他の値を返します。

  • return_dtype:ユーザー関数の戻り値の要素のデータ型。デフォルト値 None の場合は、自動的にデータ型が推定されます。

以下の例では、a 列と b 列を使ってユーザー定義関数 hypot を適用し、その結果を新しい列 c として DataFrame に追加しています。

df = pl.DataFrame(
    {
        "a": [3, 3, 3, 4],
        "b": [4, 12, 6, 7],
        "g": ['A', 'B', 'A', 'B']
    }
)

def hypot(args:list[pl.Series]):
    a, b = args
    return (a**2 + b**2)**0.5
    
df.with_columns(
    pl.map_batches(['a', 'b'], hypot, return_dtype=pl.Float64).alias('c')
)
shape: (4, 4)
abgc
i64i64strf64
34"A"5.0
312"B"12.369317
36"A"6.708204
47"B"8.062258

column_names引数には演算式を使用することができます。下のコードでは、a列の値に1を加えた結果とb列の値を使って計算します。

df.with_columns(
    pl.map_batches([pl.col('a') + 1, 'b'], hypot).alias('c')

)
shape: (4, 4)
abgc
i64i64strf64
34"A"5.656854
312"B"12.649111
36"A"7.211103
47"B"8.602325

演算式とユーザー関数以外に、pl.map_batches() には次の引数があります。

  • return_dtype(デフォルト: None): ユーザー関数が返す Series の要素のデータ型 を指定します。 指定しない場合、Polars がユーザー関数の出力型を推定します。

  • is_elementwise(デフォルト: False): ユーザー関数が “要素ごとの処理(elementwise)” であることを Polars に知らせるためのフラグです。 True にすると、グループ化の有無に関係なく、入力データ全体が一括でユーザー関数に渡されます。

  • returns_scalar(デフォルト: False): ユーザー関数が スカラー値を返す 場合に指定します。 指定しないと、Polars は Series を返す関数として扱います。

以下の f1()f2()f3() は、それぞれの引数の動作を示す例です。

  • f1():要素同士の計算のみを行い、Series を返す

  • f2():要素同士の計算ではないが、Series を返す

  • f3():Series ではなく スカラー値 を返す

import threading

lock = threading.Lock()

def f1(args):
    with lock:
        a, b = args
        print(f"{a.to_list()} {b.to_list()}")
        return a + b

def f2(args):
    with lock:
        a, b = args
        print(f"{a.to_list()} {b.to_list()}")
        return a - b.min()

def f3(args):
    with lock:
        a, b = args
        print(f"{a.to_list()} {b.to_list()}")
        return a.max() - b.min()

return_dtype を指定しない場合、Polars はユーザー関数の出力型を推定するために ダミーの入力([1, 1])を使って関数を一度実行します。

df.select(
    pl.map_batches(['a', 'b'], f1)
);
[1, 1] [1, 1]
[3, 3, 3, 4] [4, 12, 6, 7]

return_dtype を指定すると、このダミー実行がなくなります。

df.select(
    pl.map_batches(['a', 'b'], f1, return_dtype=pl.Int64)
);
[3, 3, 3, 4] [4, 12, 6, 7]

agg() のコンテキストで is_elementwise の動作がより明確に分かります。Falseの場合、 各グループのデータが グループごとに ユーザー関数に渡されます。 常に正しい結果が得られますが、関数が複数回呼ばれるため遅くなります。

df.group_by('g', maintain_order=True).agg(
    pl.map_batches(['a', 'b'], f1, return_dtype=pl.Int64, is_elementwise=False).alias('f1'),
    pl.map_batches(['a', 'b'], f2, return_dtype=pl.Int64, is_elementwise=False).alias('f2'),
)
[3, 3] [4, 6]
[3, 3] [4, 6]
[3, 4] [12, 7]
[3, 4] [12, 7]
shape: (2, 3)
gf1f2
strlist[i64]list[i64]
"A"[7, 9][-1, -1]
"B"[15, 11][-4, -3]

Trueの場合、グループとは無関係に、すべてのデータが一度に ユーザー関数に渡されます。

  • f1() のような “要素同士の計算” であれば正しい結果

  • f2() のように「要素ごとの計算ではない」関数は 誤った結果になる

df.group_by('g', maintain_order=True).agg(
    pl.map_batches(['a', 'b'], f1, return_dtype=pl.Int64, is_elementwise=True).alias('f1'),
    pl.map_batches(['a', 'b'], f2, return_dtype=pl.Int64, is_elementwise=True).alias('f2'),
)
[3, 3, 3, 4] [4, 12, 6, 7]
[3, 3, 3, 4] [4, 12, 6, 7]
shape: (2, 3)
gf1f2
strlist[i64]list[i64]
"A"[7, 9][-1, -1]
"B"[15, 11][-1, 0]

f3() はスカラー値(1つの数値)を返すため、returns_scalar=True を指定しないとエラーになります。

df.group_by('g', maintain_order=True).agg(
    pl.map_batches(['a', 'b'], f3, return_dtype=pl.Int64, returns_scalar=True).alias('f3'),
)
[3, 3] [4, 6]
[3, 4] [12, 7]
shape: (2, 2)
gf3
stri64
"A"-1
"B"-3

演算式には、map_batches()というメソッドもあります。以下はExpr.map_batches()を使って演算式の計算結果をユーザー関数で処理する例です。2列のデータは2回に分かり、square()関数に渡します。

def square(s):
    return s**2

df.select(
    pl.col('a', 'b').map_batches(square)
)
shape: (4, 2)
ab
i64i64
916
9144
936
1649

pl.struct()を使って複数の列を一つの構造体列に変換し、その後でmap_batches()を使ってカスタム関数を適用することができます。以下は、複数の列を処理するためのサンプルコードです。

def square2(s):
    return s.struct.field('a')**2 + s.struct.field('b')**2

df.select(
    pl.struct('a', 'b').map_batches(square2)
)
shape: (4, 1)
a
i64
25
153
45
65

map_groups#

pl.map_groups() を使うと、各グループのデータをユーザー定義関数で処理し、その結果をまとめて取得できます。 実装上は map_groups()map_batches() は同じ仕組みで動作していますが、agg() コンテキストで使用する場合は、map_groups() を使うことでコードの意図がより明確になり、読みやすくなります。

以下の例では、各グループごとに a 列の最大値と b 列の最大値の比を計算しています。

df = pl.DataFrame(
    {
        "a": [3, 3, 3, 4],
        "b": [4, 12, 6, 7],
        "g": ['A', 'B', 'A', 'B']
    }
)

def ratio_max(args):
    a, b = args
    return a.max() / b.max()

df.group_by('g').agg(
    pl.map_groups(['a', 'b'], ratio_max, returns_scalar=True)
)
shape: (2, 2)
ga
strf64
"B"0.333333
"A"0.5

map_elements#

Expr.map_elements()を使用して演算式の各個値をカスタム関数に渡し、その結果を新しい列として計算することができます。以下の例では、f 関数を使って各値を処理する例です。

def f(x):
    if x > 10:
        return 'large'
    elif x > 5:
        return 'middle'
    else:
        return 'small'

df.with_columns(
    pl.col.b.map_elements(f, return_dtype=pl.String).alias('b_category')
)
shape: (4, 4)
abgb_category
i64i64strstr
34"A""small"
312"B""large"
36"A""middle"
47"B""middle"

Expr.map_elements()は各値を逐一処理するため、データ量が多い場合は演算速度が遅くなる可能性があります。そのため、ベクトル演算やPolarsの組み込み関数で処理できない場合にのみ使用することが推奨されます。例えば、上記の例では、以下のようにベクトル化された条件分岐を使用することもできます:

df.with_columns(
    pl.when(pl.col('b') > 10).then(pl.lit('large'))
      .when(pl.col('b') > 5).then(pl.lit('middle'))
      .otherwise(pl.lit('small'))
      .alias('b_category')
)
shape: (4, 4)
abgb_category
i64i64strstr
34"A""small"
312"B""large"
36"A""middle"
47"B""middle"

agg()over() のコンテキストで map_elements() を使用しても、ユーザー関数には スカラー値 が渡されます。

以下の例では、各要素に対して関数 f が呼ばれ、値が順番に渡されることが確認できます。

def f(x):
    print("---")
    print(x)
    return x

g = df.group_by('g', maintain_order=True)
g.agg(pl.col('a').map_elements(f, return_dtype=pl.Int64))
---
3
---
3
---
3
---
4
shape: (2, 2)
ga
strlist[i64]
"A"[3, 3]
"B"[3, 4]

グループごとのデータをユーザー関数に渡したい場合は、map_batches() を使うか、あるいは .implode().map_elements() を使う必要があります。 注意点として、map_elements()return_dtype 引数は map_batches() と少し意味が異なります。

  • map_batches() では、return_dtypeユーザー関数の戻り値の要素型 を指定します。

  • map_elements() では、return_dtypeユーザー関数の戻り値そのもののデータ型 を指定します。

例えば .implode().map_elements() の場合、ユーザー関数の戻り値は Series になるため、return_dtype には pl.List(...) を指定します。

g.agg(pl.col('a').implode().map_elements(f, return_dtype=pl.List(pl.Int64)))
---
shape: (2,)
Series: '' [i64]
[
	3
	3
]
---
shape: (2,)
Series: '' [i64]
[
	3
	4
]
shape: (2, 2)
ga
strlist[i64]
"A"[3, 3]
"B"[3, 4]
g.agg(pl.col('a').map_batches(f, return_dtype=pl.Int64))
------
shape: (2,)
Series: '' [i64]
[
	3
	4
]

shape: (2,)
Series: '' [i64]
[
	3
	3
]
shape: (2, 2)
ga
strlist[i64]
"A"[3, 3]
"B"[3, 4]

rolling_map#

pl.rolling_mean()などの移動ウィンドウ処理関数で対応できない場合に、rolling_map()を使用してウィンドウ内のデータをカスタムユーザー関数で処理できます。具体的には、以下のコードは、各ウィンドウ内のデータをユーザー関数に渡し、処理した結果を新しい列として追加します。

Tip

pl.Series._s:Seriesオブジェクトの内部データにアクセスするために使用します。これにより、実際のデータやそのメモリのIDを確認できます。

argumentsリストに、各ウィンドウ内のデータとその Series オブジェクトのメモリIDが記録されます。効率的にメモリを利用するため、同じSeriesオブジェクトが複数回渡されていることが確認できます。

df = pl.DataFrame(
    {
        "a": [2, 4, 0, 3, 1, 6]
    }
)

arguments = []
def f(s):
    arguments.append((id(s._s), s.to_list()))
    return s.mean()

df.select(
    pl.col('a').rolling_map(f, window_size=3, center=True, min_samples=1)
)

for id_, arg in arguments:
    print(id_, arg)
2704225393120 [2, 4]
2704225393696 [2, 4, 0]
2704225393696 [4, 0, 3]
2704225393888 [0, 3, 1]
2704225389904 [3, 1, 6]
2704225393696 [1, 6]

ufunc#

NumPyufunc関数を直接使用して列あるいは演算式に対して演算を行うことができます。例えば、np.hypot は二つの値の平方根の和を計算する関数で、Polarsでは次のように使用することができます。

df = pl.DataFrame(
    {
        "a": [3, 3, 3, 4],
        "b": [4, 12, 6, 7],
        "g": ['A', 'B', 'A', 'B']
    }
)

df.with_columns(
    np.hypot(pl.col("a"), pl.col("b")).alias("c")
)
shape: (4, 4)
abgc
i64i64strf64
34"A"5.0
312"B"12.369317
36"A"6.708204
47"B"8.062258

np.hypot()を使って列を計算する際、Polarsの内部でどのように処理されるかを理解するために、次のように演算式を見てみます。この演算式はa列を構造体列に変換し、b列を”argument_1”として追加します。構造体列に対して、python_udf()で処理します。この関数でNumPyのnp.hypot()を呼び出します。

np.hypot(pl.col("a"), pl.col("b"))
col("a").as_struct([col("b").alias("argument_1")]).python_udf()

列名とフィールド名変更#

Expr.name.map()

列名をユーザー関数で変更します。

Expr.name.map_fields()

Structのfield名をユーザー関数で変更します。

def rename(n):
    return n.upper() + n
    
df.select(pl.all().name.map(rename))
shape: (4, 3)
AaBbGg
i64i64str
34"A"
312"B"
36"A"
47"B"

DataFrame.map_rows()

行をTupleとして、ユーザー関数に渡します。

def f(row):
    a, b, g = row
    return f"{g}:{a + b}", f"{g}:{a - b}"
    
df.map_rows(f).rename({"column_0": "add", "column_1": "sub"})
shape: (4, 2)
addsub
strstr
"A:7""A:-1"
"B:15""B:-9"
"A:9""A:-3"
"B:11""B:-3"

辞書で行を処理した場合は、DataFrame.iter_rows()を使います。

for row in df.iter_rows(named=True):
    print(row)
{'a': 3, 'b': 4, 'g': 'A'}
{'a': 3, 'b': 12, 'g': 'B'}
{'a': 3, 'b': 6, 'g': 'A'}
{'a': 4, 'b': 7, 'g': 'B'}