2013年2月1日金曜日

[Clojure]LazySeqで嵌ったのでメモ

ClojureでCSVファイルを読み込んでいたら OutOfMemoryError に出会ってしまったので 戒めの意味をこめてメモ。


先頭2行がヘッダで残りの行がデータ、というExcelチックなファイルを読み込んでみます。

まず、読み込む対象となる大きなCVSファイルを作成します。

(require '[clojure.data.csv :as csv])

;; ファイル名
(def LARGE-CSV-FILE "large-file.csv")

;; ファイル作成
(with-open [w (clojure.java.io/writer LARGE-CSV-FILE)]
  (.write w "ID1, ID2, ID3, ID4\n")
  (.write w "a, b, c, d\n")
  (doseq [x (range 5000000)]
    (.write w (format "%s, %s, %s, %s\n" (rand) (rand) (rand) (rand)))))

CSVファイルを読み込んで指定した関数を実行する補助関数を定義します。
clojure.data.csv/read-csv はCSVファイルの内容を表すLazySeqを返すので、 指定された関数に渡される際にはまだ実際の読み込み処理は行われていません。

;; CSVを表すLazySeqを引数として関数を実行する関数
(defn call-with-large-csv [f]
  (with-open [r (clojure.java.io/reader LARGE-CSV-FILE)]
    (f (csv/read-csv r))))

CSVファイルを表すLazySeqから、先頭行を取得してみます。

(call-with-large-csv
  (fn [csv] (first csv)))
;; => ["ID1" " ID2" " ID3" " ID4"]

この処理は即座に終了します。 LazySeqのおかげで、実際に読み込まれたのは500万行あるCSVファイルのうち1行だけ(のはず)です。

次に、 ヘッダを表す先頭の2行を取得してみます。

(call-with-large-csv
  (fn [csv] (take 2 csv)))
;; => IOException Stream closed  java.io.BufferedReader.ensureOpen

この処理は正常に終了しません。 「ストリームが既にクローズされています」という例外が発生してしまいます。

これは、take が返す値がLazySeqであるために起こります。
補助関数(call-with-large-csv)の外側でLazySeqの要素が必要(replで表示したい)になると、 その時になってから実際の読み込み処理が行われますが、このときにはもう(with-openによって)入力ストリームはクローズしています。

この処理を正常に行うには、実際の読み込み処理をストリームがオープンしている間に行わなければなりません。 読み込み処理を行いたいタイミングで、doallマクロで囲んだり、ベクタに変換したりしましょう。

(call-with-large-csv
  (fn [csv] (doall (take 2 csv))))
;; => (["ID1" " ID2" " ID3" " ID4"] ["a" " b" " c" " d"])

(call-with-large-csv
  (fn [csv] (vec (take 2 csv))))
;; => [["ID1" " ID2" " ID3" " ID4"] ["a" " b" " c" " d"]]

次に、ファイルの最終行を取得してみます。

(call-with-large-csv
  (fn [csv] (last csv)))
;; => ["0.9335203181407478" " 0.19775692522243427" " 0.21165896344265756" " 0.9433273177661132"]

多少時間はかかりますが、正常に終了します。

同様に、doseqで各行に対して処理を行うふりをしても、処理は正常に終了します。

(call-with-large-csv
  (fn [csv]
    (doseq [x csv] nil)))
;; => nil

doseqで処理を行った後で、先頭行を取得してみます。

(call-with-large-csv
  (fn [csv]
    (doseq [x csv] nil)
    (first csv)))
;; => java.lang.OutOfMemoryError: Java heap space

この処理はメモリが足りないと言われて異常終了します。
処理の最後にLazySeqの先頭要素が必要なので、CSVファイルの内容を保持しておくために doseq中にGCでメモリを回収できないからだと思われます。

以下のようにdoseqよりも前に先頭行を取得しておけば、CSVファイル(LazySeq)の内容すべてを 保持する必要はないため、メモリが足りなくなることなく正常に終了します。

(call-with-large-csv
  (fn [csv]
    (let [header (first csv)]
      (doseq [x csv] nil)
      header)))
;; => ["ID1" " ID2" " ID3" " ID4"]

次に、先頭行だけでなく、先頭の2行(ヘッダ)を取得してみます。

;; 先頭2行のヘッダを取得する関数
(defn get-headers [csv]
  (take 2 csv))

(call-with-large-csv
  (fn [csv]
    (let [headers (get-headers csv)]
      (doseq [x csv] nil)
      headers)))
;; => java.lang.OutOfMemoryError: Java heap space

この処理は異常終了します。 原因は get-headers が返す値がLazySeqであり、doseqの後までCSVの内容を保持しておく必要があるためかと思います。

私はここで悩みました。直接take関数などを書いていればすぐに気づいたかもしれませんが、 「ヘッダを取得」という関数を作成したつもりでいたために返値がLazySeqであるということを忘れていました。

この場合も、処理を正常に終了させるためには、あらかじめdoallなどでLazySeqの要素を計算し、 巨大なデータを保持しなくても良いようにします。

(call-with-large-csv
  (fn [csv]
    (let [headers (doall (get-headers csv))]
      (doseq [x csv] nil)
      headers)))
;; => (["ID1" " ID2" " ID3" " ID4"] ["a" " b" " c" " d"])

ファイルに限らず、LazySeqで巨大なデータを扱うときは、処理の途中でGCできるかどうか気を付けようと思います。

0 件のコメント:

コメントを投稿