ファイルの入出力#

Polarsでは、大規模なデータセットを効率的に扱うための高速なファイル入出力操作が提供されています。データを読み込んだり書き出したりする際に、さまざまなフォーマットに対応しており、迅速なデータ処理をサポートします。この章では、Polarsを使用したファイルの入出力操作方法について詳しく説明します。

import polars as pl
from helper.jupyter import row, capture_except

CSVファイル#

CSVファイルを読み込む際には、ファイル構造やデータの特性に応じて柔軟に操作する必要があります。本セクションでは、Polarsを使用してさまざまなCSVファイルを読み込む方法を紹介します。

読み込み#

ヘッダー#

CSVファイルには、ヘッダーの有無や、ヘッダーが複数行にわたる場合があります。以下のデータを例に、ヘッダーの扱い方について説明します。

%%writefile data/csv_header.csv
A,B
a,b
0,1
2,3
4,5
Overwriting data/csv_header.csv
  • df1: デフォルト設定では、CSVファイルをヘッダー付きとして読み込みます。この場合、データの先頭行が列の名前として解釈されます。

  • df2: has_header=Falseを指定することで、CSVの先頭行をデータとして扱います。この場合、new_columns引数を使用して列名を自分で指定できます。

  • df3: skip_rows引数を指定することで、最初のN行をスキップしてからデータを読み込むことができます。

  • df4: skip_rows_after_header引数を指定することで、ヘッダー行の次のN行をスキップしてデータを読み込みます。

  • df5: 最初の2行をヘッダーなしで読み込んで、それぞれの列を結合した結果をnew_columns引数に渡し、新しい列名として適用します。この方法を使うことで、複数行のヘッダーを柔軟に扱うことができます。

これらの方法を活用することで、CSVデータの構造に応じた柔軟な読み込みが可能になります。

fn = 'data/csv_header.csv'
df1 = pl.read_csv(fn)
df2 = pl.read_csv(fn, has_header=False, new_columns=['x', 'y'])
df3 = pl.read_csv(fn, skip_rows=1)
df4 = pl.read_csv(fn, skip_rows_after_header=1)

df_header = pl.read_csv(fn, n_rows=2, has_header=False)
columns = df_header.select(pl.all().str.join('-')).row(0)
df5 = pl.read_csv(fn, has_header=False, skip_rows=2, new_columns=columns)
row(df1, df2, df3, df4, df5)
shape: (4, 2)
AB
strstr
"a""b"
"0""1"
"2""3"
"4""5"
shape: (5, 2)
xy
strstr
"A""B"
"a""b"
"0""1"
"2""3"
"4""5"
shape: (3, 2)
ab
i64i64
01
23
45
shape: (3, 2)
AB
i64i64
01
23
45
shape: (3, 2)
A-aB-b
i64i64
01
23
45

列のデータ型#

infer_schema引数がデフォルト値Trueの場合、infer_schema_length引数で指定された先頭の行数を使用して各列のデータ型を推定します。この範囲を超えて異なるデータ型の値が出現した場合、エラーが発生します。以下のデータを例に、データ型の扱い方について説明します。

%%writefile data/csv_different_type.csv
A,B
0,1
2,3
4,5
a,5.5
10,20
Overwriting data/csv_different_type.csv

infer_schema_lengthのデフォルト値は100ですが、以下のコードでは、infer_schema_lengthを2行に設定してエラーを発生させます。

%%capture_except
df = pl.read_csv('data/csv_different_type.csv', infer_schema_length=2)
ComputeError: could not parse `a` as dtype `i64` at column 'A' (column number 1)

The current offset in the file is 15 bytes.

You might want to try:
- increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),
- specifying correct dtype with the `schema_overrides` argument
- setting `ignore_errors` to `True`,
- adding `a` to the `null_values` list.

Original error: ```remaining bytes non-empty```

エラーメッセージにはいくつかの解決方法が示されています。以下はそれらの方法を使用してデータを読み込む例です。

  • df1: infer_schema_length引数で推定行数を増やすことで、A列のデータ型をstr、B列をf64として読み込みます。

  • df2: infer_schema_length=Noneを指定すると、すべての行を使用してデータ型を推定します。また、null_values引数を使用して特定の値をnullと見なすことで、A列をi64として読み込みます。

  • df3: ignore_errors=Trueを指定すると、推定データ型に一致しない値をnullとして読み込みます。この場合、A列とB列はどちらもi64になります。

  • df4: schema_overrides引数を使用して、各列のデータ型を明示的に指定します。さらに、ignore_errors=Trueを指定して不正な値を除外します。schema_overridesを使用すると、効率的なデータ型を選択でき、メモリ使用量を削減できます。

これらの方法を使用することで、データ型の推定やエラー処理に柔軟に対応できます。

fn = 'data/csv_different_type.csv'
df1 = pl.read_csv(fn, infer_schema_length=1000)
df2 = pl.read_csv(fn, infer_schema_length=None, null_values=['a'])
df3 = pl.read_csv(fn, infer_schema_length=2, ignore_errors=True)
df4 = pl.read_csv(fn, schema_overrides={'A':pl.Int16, 'B':pl.Float32}, ignore_errors=True)
row(df1, df2, df3, df4)
shape: (5, 2)
AB
strf64
"0"1.0
"2"3.0
"4"5.0
"a"5.5
"10"20.0
shape: (5, 2)
AB
i64f64
01.0
23.0
45.0
null5.5
1020.0
shape: (5, 2)
AB
i64i64
01
23
45
nullnull
1020
shape: (5, 2)
AB
i16f32
01.0
23.0
45.0
null5.5
1020.0

スペース処理#

CSVデータ内の列値に末尾のスペースが含まれている場合、Polarsの標準CSVエンジンはこれをそのまま取り込み、列データ型をstrとして解釈します。例えば、次のようなCSVデータを読み込む場合を考えます:

%%writefile data/csv_trailing_space.csv
str,int,float
日本語 ,4 ,5.67 
abc ,5 ,1.23 
Overwriting data/csv_trailing_space.csv

このデータを読み込むと、Polarsの標準エンジンとuse_pyarrow=Trueを指定した場合で動作が異なります:

  • df1: Polarsの標準エンジンでは、すべての列が文字列(str)として扱われます。

  • df2: use_pyarrow=Trueを指定すると、数値列(int, float)が適切に解釈されます。

fn = 'data/csv_trailing_space.csv'
df1 = pl.read_csv(fn)
df2 = pl.read_csv(fn, use_pyarrow=True)
row(df1, df2)
shape: (2, 3)
strintfloat
strstrstr
"日本語 ""4 ""5.67 "
"abc ""5 ""1.23 "
shape: (2, 3)
strintfloat
stri64f64
"日本語 "45.67
"abc "51.23

Polarsでは文字列列を自動的に数値型に変換するカスタム関数を作成することで、スペースを取り除きつつ適切にキャストできます。以下はその例です。

  1. s.str.strip_chars() を使用して余分なスペースを削除。

  2. .cast(int_type) を試みて、整数型に変換できるかを確認。

  3. 整数型への変換が失敗した場合は .cast(float_type) を試みて、浮動小数型に変換。

  4. どちらのキャストも失敗した場合には元の文字列型を返す。

from polars import selectors as cs
from polars.exceptions import InvalidOperationError

# この関数はhelper/polars.pyにあります。
def try_cast_to_number(s, int_type=pl.Int64, float_type=pl.Float64):
    try:
        return s.str.strip_chars().cast(int_type)
    except InvalidOperationError:
        try:
            return s.str.strip_chars().cast(float_type)
        except InvalidOperationError:
            return s

df1.with_columns(cs.string().map_batches(try_cast_to_number))
shape: (2, 3)
strintfloat
stri64f64
"日本語 "45.67
"abc "51.23

複数のファイルを読み込み#

次のコードを実行して、stockデータをdata/stockフォルダにダウンロードします。

from helper.utils import download_folder_from_github, print_folder_structure
download_folder_from_github("https://github.com/jeroenjanssens/python-polars-the-definitive-guide/tree/main/data/stock", "data/stock")
data/stock exists, please delete and try again.

ダウンロードしたデータのフォルダ構造は次のようなものです。

print_folder_structure("data/stock", max_file_count=3)
|-- asml
    |-- 1999.csv
    |-- 2000.csv
    |-- 2001.csv
    |-- ...
|-- nvda
    |-- 1999.csv
    |-- 2000.csv
    |-- 2001.csv
    |-- ...
|-- tsm
    |-- 1999.csv
    |-- 2000.csv
    |-- 2001.csv
    |-- ...

上のすべてのCSVファイルを一気に読み込みするのは、ファイルパスにワイルドカードを使用します。

df_stock = pl.read_csv('data/stock/**/*.csv', try_parse_dates=True)
df_stock
shape: (18_476, 8)
symboldateopenhighlowcloseadj closevolume
strstrf64f64f64f64f64i64
"ASML""1999-01-04"11.76562512.2812511.76562512.1406257.5225231801867
"ASML""1999-01-05"11.85937514.2511.7187513.968758.6552578241600
"ASML""1999-01-06"14.2517.60156314.20312516.87510.45601816400267
"ASML""1999-01-07"14.74218817.812514.5312516.85156310.44149517722133
"ASML""1999-01-08"16.07812516.28906315.02343815.7968759.78799510696000
"TSM""2023-06-26"102.019997103.040001100.089996100.11000199.1259548560000
"TSM""2023-06-27"101.150002102.790001100.019997102.080002101.0765919732000
"TSM""2023-06-28"100.5101.879997100.220001100.91999899.9279868160900
"TSM""2023-06-29"101.339996101.519997100.019997100.63999999.6507427383900
"TSM""2023-06-30"101.400002101.889999100.410004100.91999899.92798611701700

この例では各個CSVファイル中のデータにはファイル名の情報が含まれているため、ファイル名を列として作成する必要がないですが、ファイル名を結果に追加したい場合は、include_file_paths引数を使います。ファイル名列の名前はこの引数で指定された列名になります。read_csv()はまだこの引数をサポートしていないため、次の例では遅延演算のscan_csv()関数を使います。この関数のリターン値はLazyFrameで、collect()メソッドを使って実際のデータを読み出します。

df_stock = pl.scan_csv('data/stock/**/*.csv', try_parse_dates=True, include_file_paths="path")
df_stock
naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)

Csv SCAN [data\stock\asml\1999.csv, ... 74 other sources]

PROJECT */9 COLUMNS
df_stock.collect()
shape: (18_476, 9)
symboldateopenhighlowcloseadj closevolumepath
strdatef64f64f64f64f64i64str
"ASML"1999-01-0411.76562512.2812511.76562512.1406257.5225231801867"data\stock\asml\1999.csv"
"ASML"1999-01-0511.85937514.2511.7187513.968758.6552578241600"data\stock\asml\1999.csv"
"ASML"1999-01-0614.2517.60156314.20312516.87510.45601816400267"data\stock\asml\1999.csv"
"ASML"1999-01-0714.74218817.812514.5312516.85156310.44149517722133"data\stock\asml\1999.csv"
"ASML"1999-01-0816.07812516.28906315.02343815.7968759.78799510696000"data\stock\asml\1999.csv"
"TSM"2023-06-26102.019997103.040001100.089996100.11000199.1259548560000"data\stock\tsm\2023.csv"
"TSM"2023-06-27101.150002102.790001100.019997102.080002101.0765919732000"data\stock\tsm\2023.csv"
"TSM"2023-06-28100.5101.879997100.220001100.91999899.9279868160900"data\stock\tsm\2023.csv"
"TSM"2023-06-29101.339996101.519997100.019997100.63999999.6507427383900"data\stock\tsm\2023.csv"
"TSM"2023-06-30101.400002101.889999100.410004100.91999899.92798611701700"data\stock\tsm\2023.csv"

書き出し#

DataFrame.write_csv() を使用して、データフレームをCSVファイルとして出力できます。

df1.write_csv('data/csv_output_utf8.csv')

write_csv()メソッドでは、include_headerseparatorline_terminator などの引数を使って、CSVのフォーマットを細かく指定できます。ただし、エンコードを直接指定する引数はありません。そのため、一度CSVデータを StringIO に出力し、それを使用してSHIFT-JISエンコードのファイルとして保存する方法を取ります。以下はそのコード例です:

import io

buf = io.StringIO()
df1.write_csv(buf)

with open('data/csv_output_shiftjis.csv', 'w', encoding='shift-jis') as f:
    f.write(buf.getvalue())

Excelファイル#

read_excel()DataFrame.write_excel()を使用してExcelファイルの入出力を行います。Excelファイルの読み込みには、calaminexlsx2csvopenpyxlの3つのエンジンが利用可能で、デフォルトのcalamineは最も高速です。書き出しにはxlsxwriterライブラリを使用します。次のコマンドで必要なライブラリをインストールします。

conda install fastexcel xlsxwriter

読み込み#

次のコードは、指定されたシートからデータを読み込みます。sheet_id引数には、読み込みたいシート番号(1から始まる整数)或いはシート番号のリストを指定します。0の場合はすべてのシートを読み込みます。複数のシートを読み込む場合、シート名をキー、データフレームを値とする辞書を返します。sheet_names引数を使用して、シート名で読み込み対象のシートを指定することもできます。

fn = 'data/xlsx_example.xlsx'
df1, df2 = pl.read_excel(fn, sheet_id=[1, 2]).values()
row(df1, df2)
shape: (3, 2)
AB
i64str
1"aa"
3"bb"
5"cc"
shape: (3, 3)
xyz
f64f64f64
1.01.03.1
1.22.14.3
3.24.35.4
df1, df2, df3 = pl.read_excel(fn, sheet_id=0).values()
row(df1, df2, df3)
shape: (3, 2)
AB
i64str
1"aa"
3"bb"
5"cc"
shape: (3, 3)
xyz
f64f64f64
1.01.03.1
1.22.14.3
3.24.35.4
shape: (4, 2)
AB
strstr
"a""b"
"0""1"
"2""3"
"4""5"

シートSheet3には2行のヘッダーがあるため、直接読み込むと、2行目のヘッダーがデータとして扱われ、すべての列のデータ型が文字列になります。この問題を解決するために、read_options引数を使用してExcelエンジンに渡す設定を調整できます。

以下のコードでは、1回目のread_excel()n_rows=2header_row=Noneを指定し、先頭の2行をデータとして読み込んで、文字列結合し列名columnsを計算します。2回目の読み込みでは、skip_rows=2でヘッダーをスキップし、column_names=columnsで列名を指定します。

df_header = pl.read_excel(fn, sheet_id=3, read_options=dict(n_rows=2, header_row=None))
columns = df_header.select(pl.all().str.join('-')).row(0)
df3 = pl.read_excel(fn, sheet_id=3, read_options=dict(skip_rows=2, column_names=columns))
df3
shape: (3, 2)
A-aB-b
i64i64
01
23
45

calamineエンジンを使用する場合、read_optionsに渡す引数は次のようになります。

from fastexcel import ExcelReader
ExcelReader.load_sheet?
Signature:
ExcelReader.load_sheet(
    self,
    idx_or_name: 'int | str',
    *,
    header_row: 'int | None' = 0,
    column_names: 'list[str] | None' = None,
    skip_rows: 'int | None' = None,
    n_rows: 'int | None' = None,
    schema_sample_rows: 'int | None' = 1000,
    dtype_coercion: "Literal['coerce', 'strict']" = 'coerce',
    use_columns: 'list[str] | list[int] | str | Callable[[ColumnInfo], bool] | None' = None,
    dtypes: 'DType | DTypeMap | None' = None,
) -> 'ExcelSheet'
Docstring:
Loads a sheet lazily by index or name.

:param idx_or_name: The index (starting at 0) or the name of the sheet to load.
:param header_row: The index of the row containing the column labels, default index is 0.
                   If `None`, the sheet does not have any column labels.
                   Any rows before the `header_row` will be automatically skipped.
:param column_names: Overrides headers found in the document.
                     If `column_names` is used, `header_row` will be ignored.
:param n_rows: Specifies how many rows should be loaded.
               If `None`, all rows are loaded
:param skip_rows: Specifies how many rows should be skipped after the `header_row`.
                  Any rows before the `header_row` are automatically skipped.
                  If `header_row` is `None`:
                    - if `skip_rows` is `None` (default): it skips all empty rows
                    at the beginning of the sheet.
                    - if `skip_rows` is a number, it skips the specified number
                    of rows from the start of the sheet.
:param schema_sample_rows: Specifies how many rows should be used to determine
                           the dtype of a column.
                           If `None`, all rows will be used.
:param dtype_coercion: Specifies how type coercion should behave. `coerce` (the default)
                       will try to coerce different dtypes in a column to the same one,
                       whereas `strict` will raise an error in case a column contains
                       several dtypes. Note that this only applies to columns whose dtype
                       is guessed, i.e. not specified via `dtypes`.
:param use_columns: Specifies the columns to use. Can either be:
                    - `None` to select all columns
                    - A list of strings and ints, the column names and/or indices
                      (starting at 0)
                    - A string, a comma separated list of Excel column letters and column
                      ranges (e.g. `“A:E”` or `“A,C,E:F”`, which would result in
                      `A,B,C,D,E` and `A,C,E,F`)
                    - A callable, a function that takes a column and returns a boolean
                      indicating whether the column should be used
:param dtypes: An optional dtype (for all columns)
               or dict of dtypes with keys as column indices or names.
File:      c:\micromamba\envs\cad\lib\site-packages\fastexcel\__init__.py
Type:      function

書き出し#

DataFrame.write_excel()を使って、指定したファイル名とシート名にデータを書き出します。複数のデータフレームを同じファイルの別々のシートに書き出す場合、次のコードのようにWorkbookオブジェクトを作成し、そのオブジェクトに対して複数回.write_excel()を呼び出します。

import xlsxwriter

with xlsxwriter.Workbook("data/xlsx_example_output.xlsx") as wb:
    df1.write_excel(wb, "df1")
    df2.write_excel(wb, "df2")
    df3.write_excel(wb, "df3")    

バイナリファイル#

データフレーム操作を効率的に行うために、Polarsはさまざまなバイナリファイルフォーマットをサポートしています。特に、ipc(Inter-Process Communication)とparquetは、データの保存や転送に適したフォーマットです。

  1. IPC(Inter-Process Communication):

    • IPCフォーマットは、プロセス間での高速なデータ交換を目的としたバイナリ形式です。

    • Polarsでは、データフレームを効率的に保存および読み込むために使用され、特に同一のメモリ空間内で異なるプロセスがデータをやり取りする際に便利です。

    • 高速なシリアル化とデシリアル化が可能で、大きなデータセットの転送に最適です。

  2. Parquet:

    • Parquetは、列指向のデータフォーマットで、大規模なデータを効率的に保存できるように設計されています。

    • 特に圧縮効率が高く、読み込み時には必要な列だけを効率的に取得できるため、ストレージとI/Oパフォーマンスを最適化できます。

    • 分散処理環境(例: Apache Spark)でよく使用される形式で、データ分析やETLパイプラインで広く利用されています。

どちらのフォーマットも、Polarsで高速なデータ処理を行う際に重要な役割を果たします。ipcは主にプロセス間通信で使用され、parquetはストレージと分析の効率化に優れています。

ipcparquetフォーマットに対して、DataFrameデータの読み込みと書き出しにはread_*()write_*()を使用します。一方、LazyFrameデータの操作にはscan_*()sink_*()を使用します。それぞれの関数とメソッドについて以下に説明します。

  • read_ipc()read_parquet() 関数: バイナリファイルからデータを読み込み、DataFrameオブジェクトを取得します。

  • DataFramewrite_ipc()write_parquet() メソッド: DataFrameオブジェクトをバイナリファイルに書き出します。

  • scan_ipc()scan_parquet() 関数: バイナリファイルからデータを遅延評価形式で読み込み、LazyFrameオブジェクトを取得します。

  • LazyFramesink_ipc()sink_parquet() メソッド: LazyFrameオブジェクトをバイナリファイルストリームに書き出します。

既にメモリ上にデータがあるDataFrameのデータをファイルに保存する場合はwrite_*()メソッドを使いますが、遅延評価を使用するLazyFrameのデータをファイルに書き出す場合は、sink_*()メソッドを使用します。このメソッドは一度にすべてのデータを処理するのではなく、チャンク単位でデータを処理するため、データのサイズがメモリより大きくても問題なく処理できます。

次のコードは、scan_csv()関数とsink_ipc()メソッドを使って、複数のCSVファイルを一つのIPCファイルに変換する処理です。これにより、大量のCSVデータをメモリに負荷をかけることなく効率的にIPCフォーマットに変換でき、後で高速に読み込むことが可能になります。

df_stock = pl.scan_csv('data/stock/**/*.csv', try_parse_dates=True)
df_stock.sink_ipc('data/stock.arrow')
df_stock = pl.read_ipc('data/stock.arrow')
df_stock
shape: (18_476, 8)
symboldateopenhighlowcloseadj closevolume
strdatef64f64f64f64f64i64
"ASML"1999-01-0411.76562512.2812511.76562512.1406257.5225231801867
"ASML"1999-01-0511.85937514.2511.7187513.968758.6552578241600
"ASML"1999-01-0614.2517.60156314.20312516.87510.45601816400267
"ASML"1999-01-0714.74218817.812514.5312516.85156310.44149517722133
"ASML"1999-01-0816.07812516.28906315.02343815.7968759.78799510696000
"TSM"2023-06-26102.019997103.040001100.089996100.11000199.1259548560000
"TSM"2023-06-27101.150002102.790001100.019997102.080002101.0765919732000
"TSM"2023-06-28100.5101.879997100.220001100.91999899.9279868160900
"TSM"2023-06-29101.339996101.519997100.019997100.63999999.6507427383900
"TSM"2023-06-30101.400002101.889999100.410004100.91999899.92798611701700

上記の関数では、1つのデータフレームが1つのファイルに対応する形で処理されますが、write_ipc_stream()read_ipc_stream()を使用することで、複数のデータフレームを順番に1つのファイルに書き出し、またそれを読み込むことができます。次のコードは、バイナリファイルfに2つのデータフレームを書き出す例です。

df1 = pl.DataFrame({"a":[1, 2, 3], "b":["xx", "yyy", "zzzz"]})
df2 = pl.DataFrame({"x":[3.0, 4.0, 6.0, 7.0], "y":[1.0, 2.0, 3.0, 4.0], "z":[0.0, 0.1, 0.2, 0.3]})

with open('data/stream_test.arrow', 'wb') as f:
    df1.write_ipc_stream(f)
    df2.write_ipc_stream(f)

次のコードで、バイナリファイルから順番にデータフレームを読み込むことができます。

Note

Polars 1.20.0までのバージョンでは、デフォルト値use_pyarrow=Falseを使用した場合、このコードは正しく動作しません。use_pyarrow=Trueを指定する必要があります。

with open('data/stream_test.arrow', 'rb') as f:
    df1_r = pl.read_ipc_stream(f, use_pyarrow=True)
    df2_r = pl.read_ipc_stream(f, use_pyarrow=True)

print(df1.equals(df1_r), df2.equals(df2_r))
True True