遅延演算#
import polars as pl
import numpy as np
import threading
from helper.jupyter import row
Polarsの遅延演算は、クエリを即時実行せず「実行計画」として蓄積し、最適化後に一括実行する仕組みです。主なメリットは以下です。
自動最適化: 不要な計算の省略・操作順序の最適化
メモリ効率: 必要なタイミングでのみ処理実行
大規模データ対応: メモリ不足のリスク低減
基本フロー#
DataFrame.lazy()
メソッドでLazyFrame
に変換するか、scan_*()
関数で直接ファイルからLazyFrame
を取得します。フィルタや集計などの処理チェーンを構築します。
collect()
で最終実行するか、explain()
で実行計画を確認します。
以下の例では、df.lazy()
で LazyFrame
に変換した後、DataFrame
と同じメソッドを使って処理チェーンを構築します。
df = pl.DataFrame({
"product": ["A", "B", "C", "A", "B"],
"category": ["X", "Y", "X", "Y", "X"],
"price": [100, 200, 150, 300, 250],
"quantity": [5, 3, 4, 2, 6]
})
lazy_operations = (
df.lazy()
.filter(pl.col("price") > 150)
.filter(pl.col("price") < 280)
.group_by("category")
.agg(pl.col("quantity").sum().alias("total_quantity"))
)
explain()
を使うと、実行計画を分析できます。optimized
引数を True
にすると最適化後の計画を確認できます。以下の出力例のように、最適化後は 2 つの FILTER
が 1 つの SELECTION
に統合されます。
print(lazy_operations.explain(optimized=False))
print(lazy_operations.explain(optimized=True))
AGGREGATE
[col("quantity").sum().alias("total_quantity")] BY [col("category")] FROM
FILTER [(col("price")) < (280)] FROM
FILTER [(col("price")) > (150)] FROM
DF ["product", "category", "price", "quantity"]; PROJECT */4 COLUMNS; SELECTION: None
AGGREGATE
[col("quantity").sum().alias("total_quantity")] BY [col("category")] FROM
DF ["product", "category", "price", "quantity"]; PROJECT 3/4 COLUMNS; SELECTION: [([(col("price")) > (150)]) & ([(col("price")) < (280)])]
演算結果を取得するには collect()
メソッドを使用します。
lazy_operations.collect()
category | total_quantity |
---|---|
str | i64 |
"X" | 6 |
"Y" | 3 |
collect()
実行後も処理チェーンは保持されるため、sort()
を追加して再実行することもできます。
lazy_operations.sort("total_quantity", descending=True).collect()
category | total_quantity |
---|---|
str | i64 |
"X" | 6 |
"Y" | 3 |
次のテーブルで、遅延演算と即時実行の比較をまとめます。
特徴 |
遅延演算 (Lazy) |
即時実行 (Eager) |
---|---|---|
実行タイミング |
|
各操作ごとに即時実行 |
最適化 |
自動的に実施 |
なし |
メモリ効率 |
高い |
低い(中間データ保持) |
主な用途 |
大規模データ・複雑処理 |
小規模データ・簡易処理 |
ストリーミング処理#
Polarsのストリーミング処理は、大規模なデータセットを効率的に処理するための仕組みで、データを「チャンク」と呼ばれる小さな部分に分割し、逐次処理を行います。この手法により、メモリ消費を抑えながら高速なデータ処理が可能です。
n = 1000
df = pl.LazyFrame({
"A":np.random.randn(n),
"B":np.random.randn(n),
"C":np.random.randint(0, 10, n)
})
演算をストリーミングで処理できるかどうかを調べるには、.explain(streaming=True)
を使用します。STREAMING:
の下に表示される演算は、すべてストリーミングで処理可能であることを示しています。
df2 = df.filter(pl.col('A') > 0.5).group_by('C').agg(pl.col('B').mean())
print(df2.explain(streaming=True))
STREAMING:
AGGREGATE
[col("B").mean()] BY [col("C")] FROM
FILTER [(col("A")) > (0.5)] FROM
DF ["A", "B", "C"]; PROJECT 3/3 COLUMNS
次の出力では、group_by()
のmaintain_order=True
を設定した場合、group_by()
がストリーミング処理できなくなることがわかります。
df3 = df.filter(pl.col('A') > 0.5).group_by('C', maintain_order=True).agg(pl.col('B').mean())
print(df3.explain(streaming=True))
AGGREGATE
[col("B").mean()] BY [col("C")] FROM
STREAMING:
FILTER [(col("A")) > (0.5)] FROM
DF ["A", "B", "C"]; PROJECT 3/3 COLUMNS
ストリーミング処理を使って結果を計算する場合、.collect(streaming=True)
を呼び出します。この方法では、可能であればストリーミング処理が適用され、効率的に結果を計算します。
row(df2.collect(streaming=True), df3.collect(streaming=True))
shape: (10, 2)
|
shape: (10, 2)
|
次のコードでは、ストリーミング処理を観察するために、map_batches()
を使用してデータをユーザー関数に渡します。このユーザー関数内で、データの名前、データの長さ、および処理しているスレッドを出力します。
Tip
agg_list
引数がデフォルト値のFalse
の場合、map_batches()
の演算をストリーミング処理することはできません。
lock = threading.Lock()
def f(s):
with lock:
print(s.name, s.shape, threading.current_thread())
return s
df4 = df.select(pl.col('A', 'B').map_batches(f, agg_list=True)).filter(pl.col('A') > 0.5)
print(df4.explain(streaming=True))
STREAMING:
FILTER [(col("A")) > (0.5)] FROM
SELECT [col("A").map_list(), col("B").map_list()] FROM
DF ["A", "B", "C"]; PROJECT 2/3 COLUMNS
次のコードを実行すると、以下のような出力から、A列とB列がそれぞれおよそ長さ83のチャンクに分割され、異なるスレッドで並列処理されていることが確認できます。
df4.collect(streaming=True);
A (83,) <_DummyThread(Dummy-13, started daemon 32484)>
A (83,) <_DummyThread(Dummy-3, started daemon 5548)>
A (83,) <_DummyThread(Dummy-11, started daemon 31196)>
A (83,) <_DummyThread(Dummy-14, started daemon 29732)>
B (83,) <_DummyThread(Dummy-14, started daemon 29732)>
A (83,) <_DummyThread(Dummy-12, started daemon 32148)>
A (83,) <_DummyThread(Dummy-6, started daemon 29448)>
B (83,) <_DummyThread(Dummy-6, started daemon 29448)>
A (83,) <_DummyThread(Dummy-10, started daemon 24416)>
B (83,) <_DummyThread(Dummy-10, started daemon 24416)>
A (83,) <_DummyThread(Dummy-9, started daemon 30848)>
B (83,) <_DummyThread(Dummy-9, started daemon 30848)>
B (83,) <_DummyThread(Dummy-13, started daemon 32484)>
B (83,) <_DummyThread(Dummy-3, started daemon 5548)>
B (83,) <_DummyThread(Dummy-11, started daemon 31196)>
A (87,) <_DummyThread(Dummy-7, started daemon 25668)>
B (87,) <_DummyThread(Dummy-7, started daemon 25668)>
A (83,) <_DummyThread(Dummy-8, started daemon 32692)>
A (83,) <_DummyThread(Dummy-4, started daemon 34336)>
B (83,) <_DummyThread(Dummy-4, started daemon 34336)>
A (83,) <_DummyThread(Dummy-5, started daemon 25324)>
B (83,) <_DummyThread(Dummy-5, started daemon 25324)>
B (83,) <_DummyThread(Dummy-12, started daemon 32148)>
B (83,) <_DummyThread(Dummy-8, started daemon 32692)>
また、streaming
引数を省略した場合(デフォルト値はFalse
)、A列とB列はチャンクに分割されず、それぞれ全体を別々のスレッドで処理することがわかります。
df4.collect();
A (1000,) <_DummyThread(Dummy-7, started daemon 25668)>
B (1000,) <_DummyThread(Dummy-13, started daemon 32484)>
scan_*()
関数を使用してファイルからデータを読み込んで処理する場合、事前にデータの長さがわからないため、次のコードの出力からも分かるように、チャンクを均等に分割することができません。
df.collect().write_csv('data/streaming_test.csv')
df5 = pl.scan_csv('data/streaming_test.csv', cache=False, ).select(pl.col('A', 'B').map_batches(f, agg_list=True)).filter(pl.col('A') > 0.5)
print(df5.explain(streaming=True))
df5.collect(streaming=True, );
STREAMING:
FILTER [(col("A")) > (0.5)] FROM
SELECT [col("A").map_list(), col("B").map_list()] FROM
Csv SCAN [data/streaming_test.csv]
PROJECT 2/3 COLUMNS
A (99,) <_DummyThread(Dummy-6, started daemon 29448)>
B (99,) <_DummyThread(Dummy-6, started daemon 29448)>
A (99,) <_DummyThread(Dummy-10, started daemon 24416)>
B (99,) <_DummyThread(Dummy-10, started daemon 24416)>
A (98,) <_DummyThread(Dummy-4, started daemon 34336)>
B (98,) <_DummyThread(Dummy-4, started daemon 34336)>
A (99,) <_DummyThread(Dummy-7, started daemon 25668)>
A (99,) <_DummyThread(Dummy-11, started daemon 31196)>
B (99,) <_DummyThread(Dummy-11, started daemon 31196)>
A (99,) <_DummyThread(Dummy-8, started daemon 32692)>
B (99,) <_DummyThread(Dummy-8, started daemon 32692)>
A (99,) <_DummyThread(Dummy-14, started daemon 29732)>
B (99,) <_DummyThread(Dummy-14, started daemon 29732)>
A (99,) <_DummyThread(Dummy-13, started daemon 32484)>
B (99,) <_DummyThread(Dummy-13, started daemon 32484)>
A (99,) <_DummyThread(Dummy-9, started daemon 30848)>
B (99,) <_DummyThread(Dummy-9, started daemon 30848)>
A (11,) <_DummyThread(Dummy-12, started daemon 32148)>
B (11,) <_DummyThread(Dummy-12, started daemon 32148)>
A (99,) <_DummyThread(Dummy-3, started daemon 5548)>
B (99,) <_DummyThread(Dummy-3, started daemon 5548)>
B (99,) <_DummyThread(Dummy-7, started daemon 25668)>