複数のデータフレームの結合#

複数のデータフレームを結合する操作は、データの統合や分析において非常に重要な手段です。Polarsでは、さまざまな結合方法が提供されており、データの構造に応じて最適な方法を選択することができます。以下では、Polarsで利用できる代表的な結合手法であるconcatjoinjoin_asofjoin_whereとデータフレームの結合メソッドについて紹介します。

import numpy as np
import polars as pl
from helper.jupyter import row

concat#

pl.concat()を使用すると、複数のデータフレームを縦または横に結合できます。結合方法は引数howで指定され、以下の5種類の結合方法があります。

  • verticalおよびvertical_relaxed: 縦方向の結合

  • horizontal: 横方向の結合

  • diagonalおよびdiagonal_relaxed: 縦横両方向の結合

  • align: 縦横両方向の結合ですが、データをキーで整列してから結合を行います

以下は、次の二つのデータフレームを使って、上記の結合方法について詳しく説明します。

df1 = pl.DataFrame({"x":[1, 2, 3], "y":[2, 3, 1]})
df2 = pl.DataFrame({"x":[6, 2, 1, 5], "y":[12, 3, 2, 4]})
row(df1, df2)
shape: (3, 2)
xy
i64i64
12
23
31
shape: (4, 2)
xy
i64i64
612
23
12
54

縦結合#

以下のように、すべてのデータフレームの列名とデータ型が一致する場合は、verticalで縦に結合します。

pl.concat([df1, df2], how='vertical')
shape: (7, 2)
xy
i64i64
12
23
31
612
23
12
54

列名が一致するがデータ型が一致しない場合は、vertical_relaxedを使用して縦に結合します。この場合、結果のデータ型は上位のデータ型が採用されます。以下のコード例では、df2x列をFloat64型にキャストしてから結合しています。このように、x列のデータ型がFloat64に統一され、縦に結合されます。

pl.concat([
    df1, 
    df2.with_columns(pl.col('x').cast(pl.Float64))
    ], 
    how='vertical_relaxed')
shape: (7, 2)
xy
f64i64
1.02
2.03
3.01
6.012
2.03
1.02
5.04

Pandasのように縦結合するとき、各データフレームにキーを付ける方法についてのプログラム例を以下に示します。この方法では、df1df2にそれぞれキーを付けてから縦に結合します。 プログラムには、key列を追加して各データフレームの行にキーを付けてから縦に結合することで、元のデータフレームを識別できるようにしています。

data = {"A":df1, "B":df2}
pl.concat([
    df.select(pl.lit(key).alias("key"), pl.all()) 
    for key, df in data.items()
])
shape: (7, 3)
keyxy
stri64i64
"A"12
"A"23
"A"31
"B"612
"B"23
"B"12
"B"54

横結合#

列名が異なるデータフレームを横に結合するには、horizontalを使用します。以下のプログラムでは、df1df2の列名を2種類の方法でリネームし、横結合します。このように、df1の列名に1を、df2の列名に2を付けて横に結合します。df2df1より行数が多いため、df1に存在しない行にはnullが補完されます。

pl.concat([
    df1.rename(lambda name:f"{name}1"), 
    df2.select(pl.all().name.suffix("2"))
], how='horizontal')
shape: (4, 4)
x1y1x2y2
i64i64i64i64
12612
2323
3112
nullnull54

一部の列名が同じで、一部の列名が異なる場合、diagonaldiagonal_relaxedを使用して結合できます。diagonal_relaxedは自動的に上位のデータ型を採用します。次のプログラムでは、df1u列が追加され、df2v列が追加され、diagonalで二つのデータフレームを結合します。列名が一致するデータは縦に結合し、一致しない列はNULLで欠損値を表します。

縦と横結合#

dfs = [
    df1.with_columns(u=pl.col('x') + pl.col('y')),
    df2.with_columns(v=pl.col('x') * pl.col('y'))
]
pl.concat(dfs, how='diagonal')
shape: (7, 4)
xyuv
i64i64i64i64
123null
235null
314null
612null72
23null6
12null2
54null20

整列結合#

align結合は、diagonalと似ていますが、列名が一致するデータをキーとして集合化し、他の列の値を統合します。以下は、df1df2に追加した列を使ってalignで結合する例です。このように、align結合では共通のxyの値をキーとして行をマージし、他の列の値を統合しています。例えば、x=1, y=2の行はu=3v=2が統合されて1行になります。

pl.concat(dfs, how="align")
shape: (5, 4)
xyuv
i64i64i64i64
1232
2356
314null
54null20
612null72

pl.align_frames()を使用すると、複数のデータフレームを指定した列で整列させることができます。以下の例では、df1df2xおよびy列で整列させています。整列後の各データフレームの行数は同じで、指定された列の値に基づいて他の列が整列されています。

row(*pl.align_frames(*dfs, on=['x', 'y']))
shape: (5, 3)
xyu
i64i64i64
123
235
314
54null
612null
shape: (5, 3)
xyv
i64i64i64
122
236
31null
5420
61272

join#

Polarsjoin()メソッドは、SQLのように2つのデータフレームを結合するための方法を提供します。joinは、異なる結合戦略を使用して、2つのデータフレームの対応する行をマッチさせることができます。

df.join(
    other,                # 結合するもう1つのDataFrame
    on=None,              # 両方のDataFrameの結合に使う列名または式
    how='inner',          # 結合方法(デフォルトは'inner')
    left_on=None,         # 左側のDataFrameの結合列
    right_on=None,        # 右側のDataFrameの結合列
    suffix='_right',      # 重複した列名に付ける接尾辞
    validate='m:m',       # 結合タイプの検証 ('m:m', 'm:1', '1:m', '1:1')
    join_nulls=False,     # Null値もマッチさせるかどうか
    coalesce=None         # 共通のキー列に対してnull値を埋めるかどうか
)

引数howで結合方法を指定します。

  • inner: 両方のテーブルで一致する行を返す。

  • left: 左のテーブルのすべての行と、右のテーブルの一致する行を返す。

  • right: 右のテーブルのすべての行と、左のテーブルの一致する行を返す。

  • full: 左右どちらかに一致する行をすべて返す。

  • semi: 左テーブルから一致する行を返すが、右のテーブルからは列を返さない。

  • anti: 左テーブルの一致しない行を返す。

df_left = pl.DataFrame({
    "id": [1, 2, 3, 4],
    "name": ["Alice", "Bob", "Charlie", "David"]
})

df_right = pl.DataFrame({
    "id": [3, 4, 5],
    "age": [23, 30, 40]
})

inner#

両方のデータフレームに存在するidに基づいて、内部結合を行います。次の例では、idが3と4に一致する行のみが返されました。

row(
    df_left, df_right, 
    df_left.join(df_right, on="id", how="inner")    
)
shape: (4, 2)
idname
i64str
1"Alice"
2"Bob"
3"Charlie"
4"David"
shape: (3, 2)
idage
i64i64
323
430
540
shape: (2, 3)
idnameage
i64stri64
3"Charlie"23
4"David"30

leftとright#

左のデータフレームのすべての行を返し、右のデータフレームに一致するデータがあれば、それも含めます。次の例では、idが1と2の行は右に対応するデータがないため、agenullです。

row(
    df_left, df_right,
    df_left.join(df_right, on="id", how="left")
)
shape: (4, 2)
idname
i64str
1"Alice"
2"Bob"
3"Charlie"
4"David"
shape: (3, 2)
idage
i64i64
323
430
540
shape: (4, 3)
idnameage
i64stri64
1"Alice"null
2"Bob"null
3"Charlie"23
4"David"30
row(
    df_left, df_right,
    df_left.join(df_right, on="id", how="right")
)
shape: (4, 2)
idname
i64str
1"Alice"
2"Bob"
3"Charlie"
4"David"
shape: (3, 2)
idage
i64i64
323
430
540
shape: (3, 3)
nameidage
stri64i64
"Charlie"323
"David"430
null540

full#

両方のデータフレームのすべての行を返し、どちらかに存在するデータがあれば、それを含めます。左と右のどちらからデータを取得したかを区別するために、結果には二つの結合列が作成されます。右側の結合列には、重複を避けるために_rightという接尾辞が追加されます。結果から、idが1と2の行は左側のデータにのみ存在し、idが5の行は右側のデータにのみ存在することがわかります。

row(
    df_left, df_right,
    df_left.join(df_right, on="id", how="full")
)
shape: (4, 2)
idname
i64str
1"Alice"
2"Bob"
3"Charlie"
4"David"
shape: (3, 2)
idage
i64i64
323
430
540
shape: (5, 4)
idnameid_rightage
i64stri64i64
1"Alice"nullnull
2"Bob"nullnull
3"Charlie"323
4"David"430
nullnull540

coalesce引数をTrueに設定すると、これらの2つの列は1つにまとめられます。

row(
    df_left, df_right,
    df_left.join(df_right, on="id", how="full", coalesce=True)
)
shape: (4, 2)
idname
i64str
1"Alice"
2"Bob"
3"Charlie"
4"David"
shape: (3, 2)
idage
i64i64
323
430
540
shape: (5, 3)
idnameage
i64stri64
1"Alice"null
2"Bob"null
3"Charlie"23
4"David"30
5null40

semiとanti#

semiは右側に存在する行を出力します。antiは右側に存在しない行を出力します。semiとantiの結果には、右側の列は含まれません。

row(
    df_left, df_right,
    df_left.join(df_right, on="id", how="semi")
)
shape: (4, 2)
idname
i64str
1"Alice"
2"Bob"
3"Charlie"
4"David"
shape: (3, 2)
idage
i64i64
323
430
540
shape: (2, 2)
idname
i64str
3"Charlie"
4"David"
row(
    df_left, df_right,
    df_left.join(df_right, on="id", how="anti")
)
shape: (4, 2)
idname
i64str
1"Alice"
2"Bob"
3"Charlie"
4"David"
shape: (3, 2)
idage
i64i64
323
430
540
shape: (2, 2)
idname
i64str
1"Alice"
2"Bob"

cross#

crossは、2つのデータフレームのデカルト積を出力します。つまり、左側のすべての行と右側のすべての行の組み合わせを結合します。この場合は、on引数の指定は不要になります。

row(
    df_left, df_right,
    df_left.join(df_right, how="cross")
)
shape: (4, 2)
idname
i64str
1"Alice"
2"Bob"
3"Charlie"
4"David"
shape: (3, 2)
idage
i64i64
323
430
540
shape: (12, 4)
idnameid_rightage
i64stri64i64
1"Alice"323
1"Alice"430
1"Alice"540
2"Bob"323
2"Bob"430
3"Charlie"430
3"Charlie"540
4"David"323
4"David"430
4"David"540

validate#

join を使ってデータフレームを結合する際、結合キーに対応する左側と右側のデータ行数によって、次の 4 つのパターンが考えられます。

  • 1:1(一対一): 左右の結合キーが どちらもユニーク である必要があります。(例: 社員 ID と 社員証番号)

  • 1:m(一対多): 左のキーは ユニーク だが、右のキーは 重複可。(例: 部署と所属社員)

  • m:1(多対一): 左のキーは 重複可 だが、右のキーは ユニーク。(例: 注文明細と商品マスタ)

  • m:m(多対多): 両方のキーに 重複がある ことを許容。(デフォルトの動作)

マッチングする行が複数ある場合、デフォルトでは マッチしたすべての組み合わせ(デカルト積) が結果に含まれます。

join()validate 引数を使用すると、指定したマッチング関係を制約できます。設定した関係に違反する場合はエラーが発生します。

次の 2 つのデータフレームには、id == 3 の行がそれぞれ 2 つずつ含まれています。

df_left = pl.DataFrame({
    "id": [1, 2, 3, 4, 3],
    "name": ["Alice", "Bob", "Charlie", "David", "charlie"]
})

df_right = pl.DataFrame({
    "id": [3, 4, 5, 3],
    "age": [23, 30, 40, 46]
})

このまま id をキーにして join() を実行すると、id == 3 の行は デカルト積 となり、結果には 4 行が生成されます。

df_left.join(df_right, on="id")
shape: (5, 3)
idnameage
i64stri64
3"Charlie"23
3"Charlie"46
4"David"30
3"charlie"23
3"charlie"46

validate="1:1" を指定すると、結合キーに対して 一対一の関係 が満たされない場合にエラーとなり、データの整合性をチェックできます。

%%capture_except
df_left.join(df_right, on="id", validate='1:1')
ComputeError: join keys did not fulfill 1:1 validation

update#

DataFrame.updateメソッドは、Polarsにおいてインデックス列を基に既存のデータフレームを更新するために使用されます。引数onで指定された列が一致する行に対して、他の列のデータを更新します。

以下のコードは、dfdf_updateid列が一致する行を基に、df_updateの他の列(xy)の値をdfに更新する例です。

df = pl.DataFrame({
    "id": [1, 2, 3, 4],
    "x": [10, 20, 30, 40],
    "y": [40, 30, 20, 10]
})

df_update = pl.DataFrame({
    "id": [1, 3],
    "x": [100, 300],
    "y": [-10, -20],
})

df_res = df.update(df_update, on='id')
row(df, df_update, df_res)
shape: (4, 3)
idxy
i64i64i64
11040
22030
33020
44010
shape: (2, 3)
idxy
i64i64i64
1100-10
3300-20
shape: (4, 3)
idxy
i64i64i64
1100-10
22030
3300-20
44010

join_asof#

join_asofは、時間や数値のような連続的なデータに基づいて2つのDataFrameを「概ね一致」させて結合するメソッドです。これは、正確な一致ではなく、片方の値がもう片方の値の近くにある場合に使われます。主に、時系列データのような順序のあるデータで利用されます。

join_asofは通常、次のような状況で使われます:

  • 片方のデータが特定の時間に対するスナップショットを持ち、もう片方がその時間に最も近い値を持っている場合。

  • “前方一致”または”後方一致”など、指定された方向に最も近いデータを探す場合。

import polars as pl

df1 = pl.DataFrame(
    {
        "time": [1, 5, 10, 15, 20],
        "event": ["A", "B", "C", "D", "E"],
    }
)

df2 = pl.DataFrame(
    {
        "time": [2, 6, 12, 18],
        "price": [100, 105, 110, 115],
    }
)

result = df1.join_asof(df2, on="time", strategy="backward")
row(df1, df2, result)
shape: (5, 2)
timeevent
i64str
1"A"
5"B"
10"C"
15"D"
20"E"
shape: (4, 2)
timeprice
i64i64
2100
6105
12110
18115
shape: (5, 3)
timeeventprice
i64stri64
1"A"null
5"B"100
10"C"105
15"D"110
20"E"115

df1の各行に対して、df2"time"列で最も近くて「過去または現在の時間」にあたる行を結合します。つまり、df1の各行に対して、df2でその"time"に一番近い過去の"price"の値を結合します。strategy="backward"は、df1"time"の値に対して、それよりも過去または同時刻のdf2の値を選ぶという戦略です。もう一つのオプションに"forward"があり、これは未来の値を選択します。

join_where#

join_whereでは二つのDataFrameの列同士の比較条件を指定して、それに基づいて結合を行います。例えば、以下のコードでは、df1のtime列の値が、df2のtime_span列に含まれる時間範囲内にある場合に結合が行われます。複数の条件式がある場合は、それらすべての条件式を満たす場合にのみ結合が行われます。

df1 = pl.DataFrame(
    {
        "id": [100, 101, 102],
        "time": [120, 140, 160],
    }
)
df2 = pl.DataFrame(
    {
        "t_id": [404, 498, 676, 742],
        "time_span": [(100, 110), (110, 130), (90, 100), (150, 170)],
    }
)
df_res = df1.join_where(
    df2,
    pl.col('time') >= pl.col('time_span').list.get(0),
    pl.col('time') <= pl.col('time_span').list.get(1)
)
row(df1, df2, df_res)
shape: (3, 2)
idtime
i64i64
100120
101140
102160
shape: (4, 2)
t_idtime_span
i64list[i64]
404[100, 110]
498[110, 130]
676[90, 100]
742[150, 170]
shape: (2, 4)
idtimet_idtime_span
i64i64i64list[i64]
100120498[110, 130]
102160742[150, 170]

.join_where()はブール演算やis_between()もサポートしています。

df_res1 = df1.join_where(
    df2,
    (pl.col('time') >= pl.col('time_span').list.get(0)) & (pl.col('time') <= pl.col('time_span').list.get(1))
)

df_res2 = df1.join_where(
    df2,
    pl.col('time').is_between(pl.col('time_span').list.get(0), pl.col('time_span').list.get(1))
)
row(df_res1, df_res2)
shape: (2, 4)
idtimet_idtime_span
i64i64i64list[i64]
100120498[110, 130]
102160742[150, 170]
shape: (2, 4)
idtimet_idtime_span
i64i64i64list[i64]
100120498[110, 130]
102160742[150, 170]

.join_where()内部では、二つのデータフレームの行のデカルト積を計算し、その中から条件を満たす行だけをフィルタリングして出力します。そのため、データの件数が多い場合、処理が非常に遅くなる可能性があります。例えば、次のコードでは、二つの点群間で距離が0.01より小さい点のペアを計算します。この処理は計算量が \(N^2\) に比例するため、大きなデータセットでは速度が非常に遅くなります。

n = 4000
np.random.seed(42)
df1 = pl.DataFrame(dict(
    x1=np.random.normal(size=n),
    y1=np.random.normal(size=n),
))
df2 = pl.DataFrame(dict(
    x2=np.random.normal(size=n),
    y2=np.random.normal(size=n),
))
%%time
df_res1 = (
    df1
    .join_where(
        df2, 
        (pl.col('x2') - pl.col('x1'))**2 + (pl.col('y2') - pl.col('y1'))**2 < 0.0001
    )
    .sort('x1', 'y1')
)
CPU times: total: 891 ms
Wall time: 435 ms

SciPyのKDTreeを使用することで、このような処理を高速化できます。KDTreeを利用すると、空間的な距離を効率的に計算し、特定の距離内にある点のペアを高速に見つけることができます。以下のコードでは、tree1.query_ball_tree(tree2, 0.01)を使用して範囲内の点のペアを計算し、結果を基にdf_pairを作成します。その後、行番号をキーとしてdf1df_pairdf2を結合して結果データフレームdf_res2を作成します。

from scipy.spatial import KDTree
%%time
tree1 = KDTree(df1.to_numpy())
tree2 = KDTree(df2.to_numpy())
res = tree1.query_ball_tree(tree2, 0.01)

df_pair = (
    pl.DataFrame(pl.Series('index2', res, dtype=pl.List(pl.UInt32))) # resをデータフレームに変換
    .with_row_index('index1')
    .filter(pl.col('index2').list.len() > 0) # 範囲内の点が存在する行のみを残す
    .explode('index2') # リストを展開して1行1ペアにする
)
df_res2 = (
    df1
    .with_row_index('index1')
    .join(df_pair, on='index1')
    .join(
        df2.with_row_index('index2'), on='index2')
    .drop('index1', 'index2')
    .sort('x1', 'y1')
)
CPU times: total: 31.2 ms
Wall time: 19 ms
row(df_res1, df_pair, df_res2)
shape: (420, 4)
x1y1x2y2
f64f64f64f64
-2.065083-0.70744-2.05916-0.70436
-1.7690760.817306-1.7684150.825853
-1.523187-0.501234-1.514414-0.501647
-1.519346-0.035096-1.527461-0.03116
-1.4785220.932435-1.4823970.94037
1.669070.3888191.6785740.385821
1.696456-0.0387431.692313-0.037887
1.7349370.1429911.7379850.135518
1.831459-0.5327251.831177-0.523104
1.9541570.5682731.9489630.563404
shape: (420, 2)
index1index2
u32u32
101832
102789
411502
44739
452018
39311196
39351035
39641214
3978715
39941588
shape: (420, 4)
x1y1x2y2
f64f64f64f64
-2.065083-0.70744-2.05916-0.70436
-1.7690760.817306-1.7684150.825853
-1.523187-0.501234-1.514414-0.501647
-1.519346-0.035096-1.527461-0.03116
-1.4785220.932435-1.4823970.94037
1.669070.3888191.6785740.385821
1.696456-0.0387431.692313-0.037887
1.7349370.1429911.7379850.135518
1.831459-0.5327251.831177-0.523104
1.9541570.5682731.9489630.563404

resは二重リストで構成されており、各要素はtree1内の各点に対応しています。それぞれの要素には、対応するtree2内で指定範囲内にある点のインデックス(番号)のリストが格納されています。次のコードは、tree1の先頭50点について計算結果を表示し、空リスト(範囲内に点がない場合)をスキップします。

この出力から、tree1の点10は、tree2の点1832と2789が距離0.01以内に存在することが分かります。

for i, index2 in enumerate(res[:50]):
    if index2:
        print(i, index2)
10 [1832, 2789]
41 [1502]
44 [739]
45 [2018]

extendとvstackメソッド#

DataFrame.extend()DataFrame.vstack() は、既存のデータフレームに別のデータフレームの内容を行として追加するメソッドです。

  • extend(): 元のデータフレームを直接変更します(その場で変更)。変更後のデータフレームは返り値として返します。

  • vstack(): in_place 引数を指定することで、元のデータフレームを直接変更する(その場で変更)か、新しいデータフレームを返すかを選べます。

df1 = pl.DataFrame({"a": [1, 2], "b": [3, 4]})
df2 = pl.DataFrame({"a": [5, 6], "b": [7, 8]})

# clone()を使って元のデータフレームを複製
df_extend = df1.clone()
df_extend.extend(df2)  # extendで直接変更

df_vstack = df1.clone()
df_vstack.vstack(df2, in_place=True)  # vstackでその場で変更
row(df_extend, df_vstack)
shape: (4, 2)
ab
i64i64
13
24
57
68
shape: (4, 2)
ab
i64i64
13
24
57
68

両者の違いは、メモリ管理の方法にあります。

extend() は、引数のデータフレームの内容を元のデータフレームのメモリの末尾にコピーします。結果として、結合後のデータフレームのメモリは連続しています。ただし、元のデータフレームのメモリが再配置される可能性があるため、ループ内で頻繁に extend() を使用することは推奨されません。

vstack() は、結合する2つのデータフレームのメモリをそのままにし、リンクによって両者を結合します。そのため、結果として得られるデータフレームのメモリは不連続になります。この不連続性により、後で計算を行う際のパフォーマンスが低下する可能性があります。
一般的には、ループ内で vstack() を使用してデータを結合し、最後に rechunk() を実行してメモリを連続した状態に変換するのが効率的です。

print(f'{df_extend.n_chunks() = }')           # extend後のチャンク数
print(f'{df_vstack.n_chunks() = }')          # vstack後のチャンク数
print(f'{df_vstack.rechunk().n_chunks() = }')  # rechunk後のチャンク数
df_extend.n_chunks() = 1
df_vstack.n_chunks() = 2
df_vstack.rechunk().n_chunks() = 1

hstackメソッド#

DataFrame.hstack() は、2つのデータフレームを水平方向(列方向)に結合します。in_place=True を指定すると、元のデータフレームに列を追加します。

df1 = pl.DataFrame({"a": [1, 2], "b": [3, 4]})
df2 = pl.DataFrame({"c": [5, 6], "d": [7, 8]})

# in-place操作なし(新しいデータフレームを返す)
df3 = df1.hstack(df2)

# in-place操作あり
df1.hstack(df2, in_place=True)
row(df3, df1)
shape: (2, 4)
abcd
i64i64i64i64
1357
2468
shape: (2, 4)
abcd
i64i64i64i64
1357
2468

merge_sortedメソッド#

DataFrame.merge_sorted() は、あらかじめソートされた 2つのデータフレームを縦方向に結合し、指定したキー列に基づいてソートされた新しいデータフレームを返すメソッドです。このメソッドを使用する際は、入力となる両方のデータフレームが事前に指定したキー列でソートされていることが前提となります。

df1 = pl.DataFrame({"a": [3, 1, 5], "b": [6, 4, 2]})
df2 = pl.DataFrame({"a": [6, 2, 4], "b": [7, 5, 3]})

# "a" 列でソートしてからマージ
df3 = df1.sort("a").merge_sorted(df2.sort("a"), key="a")

# "b" 列でソートしてからマージ
df4 = df1.sort("b").merge_sorted(df2.sort("b"), key="b")

row(df3, df4)
shape: (6, 2)
ab
i64i64
14
25
36
43
52
67
shape: (6, 2)
ab
i64i64
52
43
14
25
36
67