遅延演算#

import polars as pl
import numpy as np
import threading
from helper.jupyter import row

Polarsの遅延演算は、クエリを即時実行せず「実行計画」として蓄積し、最適化後に一括実行する仕組みです。主なメリットは以下です。

  • 自動最適化: 不要な計算の省略・操作順序の最適化

  • メモリ効率: 必要なタイミングでのみ処理実行

  • 大規模データ対応: メモリ不足のリスク低減

基本フロー#

  1. DataFrame.lazy() メソッドで LazyFrame に変換するか、scan_*() 関数で直接ファイルから LazyFrame を取得します。

  2. フィルタや集計などの処理チェーンを構築します。

  3. collect() で最終実行するか、explain() で実行計画を確認します。

以下の例では、df.lazy()LazyFrame に変換した後、DataFrame と同じメソッドを使って処理チェーンを構築します。

df = pl.DataFrame({
    "product": ["A", "B", "C", "A", "B"],
    "category": ["X", "Y", "X", "Y", "X"],
    "price": [100, 200, 150, 300, 250],
    "quantity": [5, 3, 4, 2, 6]
})

lazy_operations = (
    df.lazy()
    .filter(pl.col("price") > 150)
    .filter(pl.col("price") < 280)
    .group_by("category")    
    .agg(pl.col("quantity").sum().alias("total_quantity"))
)

explain() を使うと、実行計画を分析できます。optimized 引数を True にすると最適化後の計画を確認できます。以下の出力例のように、最適化後は 2 つの FILTER が 1 つの SELECTION に統合されます。

print(lazy_operations.explain(optimized=False))
print(lazy_operations.explain(optimized=True))
AGGREGATE
	[col("quantity").sum().alias("total_quantity")] BY [col("category")] FROM
  FILTER [(col("price")) < (280)] FROM
    FILTER [(col("price")) > (150)] FROM
      DF ["product", "category", "price", "quantity"]; PROJECT */4 COLUMNS; SELECTION: None
AGGREGATE
	[col("quantity").sum().alias("total_quantity")] BY [col("category")] FROM
  DF ["product", "category", "price", "quantity"]; PROJECT 3/4 COLUMNS; SELECTION: [([(col("price")) > (150)]) & ([(col("price")) < (280)])]

演算結果を取得するには collect() メソッドを使用します。

lazy_operations.collect()
shape: (2, 2)
categorytotal_quantity
stri64
"X"6
"Y"3

collect() 実行後も処理チェーンは保持されるため、sort() を追加して再実行することもできます。

lazy_operations.sort("total_quantity", descending=True).collect()
shape: (2, 2)
categorytotal_quantity
stri64
"X"6
"Y"3

次のテーブルで、遅延演算と即時実行の比較をまとめます。

特徴

遅延演算 (Lazy)

即時実行 (Eager)

実行タイミング

collect() で一括実行

各操作ごとに即時実行

最適化

自動的に実施

なし

メモリ効率

高い

低い(中間データ保持)

主な用途

大規模データ・複雑処理

小規模データ・簡易処理

ストリーミング処理#

Polarsのストリーミング処理は、大規模なデータセットを効率的に処理するための仕組みで、データを「チャンク」と呼ばれる小さな部分に分割し、逐次処理を行います。この手法により、メモリ消費を抑えながら高速なデータ処理が可能です。

n = 1000
df = pl.LazyFrame({
    "A":np.random.randn(n),
    "B":np.random.randn(n),
    "C":np.random.randint(0, 10, n)
})

演算をストリーミングで処理できるかどうかを調べるには、.explain(streaming=True)を使用します。STREAMING:の下に表示される演算は、すべてストリーミングで処理可能であることを示しています。

df2 = df.filter(pl.col('A') > 0.5).group_by('C').agg(pl.col('B').mean())
print(df2.explain(streaming=True))
STREAMING:
  AGGREGATE
  	[col("B").mean()] BY [col("C")] FROM
    FILTER [(col("A")) > (0.5)] FROM
      DF ["A", "B", "C"]; PROJECT 3/3 COLUMNS

次の出力では、group_by()maintain_order=Trueを設定した場合、group_by()がストリーミング処理できなくなることがわかります。

df3 = df.filter(pl.col('A') > 0.5).group_by('C', maintain_order=True).agg(pl.col('B').mean())
print(df3.explain(streaming=True))
AGGREGATE
	[col("B").mean()] BY [col("C")] FROM
  STREAMING:
    FILTER [(col("A")) > (0.5)] FROM
      DF ["A", "B", "C"]; PROJECT 3/3 COLUMNS

ストリーミング処理を使って結果を計算する場合、.collect(streaming=True)を呼び出します。この方法では、可能であればストリーミング処理が適用され、効率的に結果を計算します。

row(df2.collect(streaming=True), df3.collect(streaming=True))
shape: (10, 2)
CB
i32f64
4-0.207007
70.241421
9-0.114259
0-0.081423
3-0.15736
2-0.22622
10.301292
5-0.095738
6-0.17092
80.130623
shape: (10, 2)
CB
i32f64
3-0.15736
0-0.081423
9-0.114259
2-0.22622
4-0.207007
70.241421
80.130623
5-0.095738
6-0.17092
10.301292

次のコードでは、ストリーミング処理を観察するために、map_batches()を使用してデータをユーザー関数に渡します。このユーザー関数内で、データの名前、データの長さ、および処理しているスレッドを出力します。

Tip

agg_list引数がデフォルト値のFalseの場合、map_batches()の演算をストリーミング処理することはできません。

lock = threading.Lock()
def f(s):
    with lock:
        print(s.name, s.shape, threading.current_thread())
        return s

df4 = df.select(pl.col('A', 'B').map_batches(f, agg_list=True)).filter(pl.col('A') > 0.5)
print(df4.explain(streaming=True))
STREAMING:
  FILTER [(col("A")) > (0.5)] FROM
     SELECT [col("A").map_list(), col("B").map_list()] FROM
      DF ["A", "B", "C"]; PROJECT 2/3 COLUMNS

次のコードを実行すると、以下のような出力から、A列とB列がそれぞれおよそ長さ83のチャンクに分割され、異なるスレッドで並列処理されていることが確認できます。

df4.collect(streaming=True);
A (83,) <_DummyThread(Dummy-13, started daemon 32484)>
A (83,) <_DummyThread(Dummy-3, started daemon 5548)>
A (83,) <_DummyThread(Dummy-11, started daemon 31196)>
A (83,) <_DummyThread(Dummy-14, started daemon 29732)>
B (83,) <_DummyThread(Dummy-14, started daemon 29732)>
A (83,) <_DummyThread(Dummy-12, started daemon 32148)>
A (83,) <_DummyThread(Dummy-6, started daemon 29448)>
B (83,) <_DummyThread(Dummy-6, started daemon 29448)>
A (83,) <_DummyThread(Dummy-10, started daemon 24416)>
B (83,) <_DummyThread(Dummy-10, started daemon 24416)>
A (83,) <_DummyThread(Dummy-9, started daemon 30848)>
B (83,) <_DummyThread(Dummy-9, started daemon 30848)>
B (83,) <_DummyThread(Dummy-13, started daemon 32484)>
B (83,) <_DummyThread(Dummy-3, started daemon 5548)>
B (83,) <_DummyThread(Dummy-11, started daemon 31196)>
A (87,) <_DummyThread(Dummy-7, started daemon 25668)>
B (87,) <_DummyThread(Dummy-7, started daemon 25668)>
A (83,) <_DummyThread(Dummy-8, started daemon 32692)>
A (83,) <_DummyThread(Dummy-4, started daemon 34336)>
B (83,) <_DummyThread(Dummy-4, started daemon 34336)>
A (83,) <_DummyThread(Dummy-5, started daemon 25324)>
B (83,) <_DummyThread(Dummy-5, started daemon 25324)>
B (83,) <_DummyThread(Dummy-12, started daemon 32148)>
B (83,) <_DummyThread(Dummy-8, started daemon 32692)>

また、streaming引数を省略した場合(デフォルト値はFalse)、A列とB列はチャンクに分割されず、それぞれ全体を別々のスレッドで処理することがわかります。

df4.collect();
A (1000,) <_DummyThread(Dummy-7, started daemon 25668)>
B (1000,) <_DummyThread(Dummy-13, started daemon 32484)>

scan_*()関数を使用してファイルからデータを読み込んで処理する場合、事前にデータの長さがわからないため、次のコードの出力からも分かるように、チャンクを均等に分割することができません。

df.collect().write_csv('data/streaming_test.csv')
df5 = pl.scan_csv('data/streaming_test.csv', cache=False, ).select(pl.col('A', 'B').map_batches(f, agg_list=True)).filter(pl.col('A') > 0.5)
print(df5.explain(streaming=True))
df5.collect(streaming=True, );
STREAMING:
  FILTER [(col("A")) > (0.5)] FROM
     SELECT [col("A").map_list(), col("B").map_list()] FROM
      Csv SCAN [data/streaming_test.csv]
      PROJECT 2/3 COLUMNS
A (99,) <_DummyThread(Dummy-6, started daemon 29448)>
B (99,) <_DummyThread(Dummy-6, started daemon 29448)>
A (99,) <_DummyThread(Dummy-10, started daemon 24416)>
B (99,) <_DummyThread(Dummy-10, started daemon 24416)>
A (98,) <_DummyThread(Dummy-4, started daemon 34336)>
B (98,) <_DummyThread(Dummy-4, started daemon 34336)>
A (99,) <_DummyThread(Dummy-7, started daemon 25668)>
A (99,) <_DummyThread(Dummy-11, started daemon 31196)>
B (99,) <_DummyThread(Dummy-11, started daemon 31196)>
A (99,) <_DummyThread(Dummy-8, started daemon 32692)>
B (99,) <_DummyThread(Dummy-8, started daemon 32692)>
A (99,) <_DummyThread(Dummy-14, started daemon 29732)>
B (99,) <_DummyThread(Dummy-14, started daemon 29732)>
A (99,) <_DummyThread(Dummy-13, started daemon 32484)>
B (99,) <_DummyThread(Dummy-13, started daemon 32484)>
A (99,) <_DummyThread(Dummy-9, started daemon 30848)>
B (99,) <_DummyThread(Dummy-9, started daemon 30848)>
A (11,) <_DummyThread(Dummy-12, started daemon 32148)>
B (11,) <_DummyThread(Dummy-12, started daemon 32148)>
A (99,) <_DummyThread(Dummy-3, started daemon 5548)>
B (99,) <_DummyThread(Dummy-3, started daemon 5548)>
B (99,) <_DummyThread(Dummy-7, started daemon 25668)>