Pythonでライフゲーム
大学のシミュレーション論の講義で知ったライフゲームをPythonで実装しました。
ライフゲームとは
このようなアルゴリズムに従って動くものをライフゲームと言います。
- 二次元の地図を用意し、各要素を0か1とする(よく0と1はそれぞれ生物がいない・いる状態と呼ばれる)
- ある1つのセルにおいて、周囲にはセルが8つ(端のセルの場合は3つだったり5つだったりする)を見た結果、
- 周囲に1つ以下しか1がいなかったら、そのある1つのセルの要素を0にする
- 周囲に2つ1がいたら、そのある1つのセルの要素は何も変えない
- 周囲に3つ1がいたら、そのある1つのセルの要素を1にする
- 周囲に4つ以上も1がいたら、そのある1つのセルの要素を0にする
非常に端折って説明したので、厳密ではない。けどこうやって説明したほうが計算効率が良くなるのでそうした。
特徴としては、
- 与えられた地図から生成される地図は必ず1通りで決まり、ランダム性はない
- 永久にループし続ける形状が存在する(「銀河」「ペンタデカスロン」「グライダー銃」などが種類として挙げられる)
- どんな1つ前の地図からも絶対にできない地図(これを「エデンの園」という)が存在する
- ライフゲーム自体からANDやOR、NOTを作れて、チューリング完全である。つまりやろうと思えば、ライフゲーム自体からライフゲームを作り出せる(もちろん、相当スペックの高いコンピューターが必要になる)
詳細については、WikipediaなりQiitaなりで調べてみてください。結構面白い(興味深いの意味)
どうやって組むのか
これだけは押さえておきたいという部分を上げてみました。
ブロードキャストを使う
普通に周囲にどれくらい1があるのかということを1つ1つのセルで確認するのはかなり計算的に無駄があります。
地図のサイズが\(10 \times 10 \)とか小さければそんなに問題はないと思います。ある地図から次の地図を生成して……、と考えても、せいぜい30イテレーションくらいだから。
計算回数は\(3 \times 3 \times 10 \times 10 \times 30 = 27000\) くらい?
最初と次の\( 3 \)はそれぞれ周囲のセルの行番号列番号、次とその次の\( 10\)はセルの数、最後はイテレーション数とか。
しかし、30×30とかになったとき、イテレーションは300とかになって、計算量は膨大になります。つまり計算回数は\(2.5M\)回とか?
オーダーで言えば、\( O (n ^3) \)くらいだと思う(ここで私がちゃんとアルゴリズムとデータ構造を勉強していないことがバレる)
それならブロードキャストを使ってしまおうと考えました。
- 行列\(M = \begin{bmatrix} 1 & 1 & 1 \\ 1 & 0 & 1 \\ 1 & 1 & 1 \end{bmatrix}\)をマスクとして、地図の行列に対して畳み込み演算を行なう
- 畳み込んだ結果は周囲にあるセルの数を表しているので、先ほどのアルゴリズムに従ってブロードキャストによって、セルの情報を0や1に書き換える
- 地図を更新する
「ブロードキャスト? 何それ美味しいの?」と思った人へ。NumPyを勉強し直す必要があります。
畳み込み演算を高速化
SciPyを使えばマスクを使った畳み込み演算なんて簡単に記述できます。しかも読みやすい。
しかし、SciPyの畳み込み演算signal.correlate2d
はめちゃくちゃ遅いので、使いたくない。
これもどうやるか考えた結果、こうなった。
- 地図のコピーを取り、1ピクセル分のパディングを取る
- 適切なインデックスを指定して8方向分の部分地図を足し算
描画の高速化
最初はMatplotLibで描画をしていたが、思うようにFPS(フレーム毎秒)を設定できなかったので結局、gifを生成する方式に変更しました。
というのも、ライフゲームの描画をmatplotlibつまりPython依存で実行されているためです。
地図更新はNumPy、つまりC/C++やFORTRAN依存だから速いのに、描画で時間を取られたくはない。
というわけで、PILを使ってImage
のim.save('fname.gif', ...)
を使って保存することにしました。
終了の確認
地図の更新はいつまでも続くものではありません。
しかし、ループする形状というものがライフゲームにはあるので、そいつらのために終了条件を設定する必要があります。
以下、解決策。
- ある地図からライフゲームのアルゴリズムに従って生成された地図をリストに保存
- ライフゲーム自体は全単射じゃないので、過去に遡って地図を保存することはできない。そのため、過去の全ての地図をリストに保存
- 過去の全ての地図のうち直近の最大20個を取り出して、同じ絵柄の地図があった時点で地図の更新を終了
- 地図の更新のリミットを5000回に設定して、たとえ地図の更新がその後続く虞があるとしてもそこで打ち切る
本当は寿命の長い形状もあるが、今回はパスすることにしました。
コード例
以下、コード。
import numpy as np
from matplotlib import pyplot as plt
from matplotlib import animation as animation
def deco(fn):
def inr_fn(mat):
res_li = [mat]
i = 0
for _ in range(5000):
res = fn(mat=mat.copy())
for res_ in res_li[-min(len(res_li), 20):]:
if np.all(res == res_):
return res_li
res_li.append(res)
mat = res.copy()
print('overflow')
return res_li
return inr_fn
@deco
def fn(mat: np.ndarray) -> np.ndarray:
_mat = mat.copy()
mat_ = np.pad(np.zeros_like(_mat), [1, 1], 'constant')
slice_li = [slice(None, -2), slice(1, -1), slice(2, None)]
for ix in slice_li:
for jx in slice_li:
if ix == jx == slice(1, -1):
continue
mat_[ix, jx] += _mat
mat_ = mat_[1:-1, 1:-1]
_mat[mat_ == 3] = 1
mat_[mat_ == 2] = 3
_mat[mat_ != 3] = 0
return _mat
def update(i):
if i % 200 == 0:
print(f'{i=}')
plt.clf()
plt.imshow(res_li[i])
def get_mat(fname: str) -> np.ndarray:
with open(fname, mode='r') as f:
glider_li = f.read().replace(' ', '0').split('\n')[:-1]
glider_li_li = [list(map(int, glider)) for glider in glider_li]
return np.array(glider_li_li, dtype=np.int8)
if __name__ == '__main__':
interval = 2
size = [60, 180]
mat = np.random.randint(0, 2, size, dtype=np.int8)
import sys
if len(sys.argv) > 1:
fname = sys.argv[1]
#fname = 'simu12_22mat/garaxy.mat'
mat = get_mat(fname)
res_li = fn(mat)
print(f'{len(res_li)=}')
fig, ax = plt.subplots(figsize=(5, 2), dpi=240)
_ = animation.FuncAnimation(fig, update, range(len(res_li)), interval=interval)
plt.show()
実行してみてわかることですが、終了するまでに出てくる全ての地図を計算するのにはそんなに時間がかかりません(せいぜい3秒)
しかし、matplotlibのところで、つまり地図の描画のところで非常に時間がかかる。
それにFPSを指定したのに、思い通りに動いてくれない!!
「どうしても描画を速くしたい!」というのなら、gifを作ってしまうのが得策だな、と思いました。
以下、gifを作るコード。
import numpy as np
import re
from PIL import Image
import itertools
class LifeGame(object):
def __init__(self):
self.matrix_li = []
def set_matrix(self, fname: str=None, shape: (int, int)=(120, 360)):
if fname is None:
matrix = np.random.randint(0, 2, shape, dtype=np.int8)
else:
with open(fname, mode='r') as f:
text_li = f.read().split('\n')
if text_li[-1] == '':
text_li = text_li[:-1]
max_len = max(len(text_li) for text_li in text_li_li)
text_li = [text + '0' * (max_len - len(text)) for text in text_li]
matrix = np.array([[int(ch) if ch == '1' else 0 for ch in text] for text in text_li], dtype=np.int8)
self.matrix_li.append(matrix)
def is_finished(self):
matrix_ = self.matrix_li[-1]
for matrix in self.matrix_li[self.judge:-1]:
if np.all(matrix == matrix_):
return True
return False
def fn(self, limit: int=5000, judge: int=20):
self.judge = -judge if type(judge) is int and judge > 0 else -20
for _ in range(limit):
self.update()
if self.is_finished():
return
def save_gif(self, ppc: int=2, fps: float=30): # ppc: pixel per cell
ppc = ppc if type(ppc) is int and ppc > 0 else 1
query = lambda matrix: -(matrix.astype(np.uint8).repeat(ppc, axis=0).repeat(ppc, axis=1))
im_li = [Image.fromarray(query(matrix)) for matrix in self.matrix_li]
im_li[0].save('anim.gif', save_all=True, append_images=im_li[1:], optimze=True, duration=1000/fps)
def update(self, *args):
_matrix = self.matrix_li[-1].copy()
# パディングを取ることで畳み込み演算の高速化
matrix_ = np.pad(np.zeros_like(_matrix), (1, 1), 'constant')
# scipy は計算速度が遅いから使わない
slice_tup = (slice(None, -2), slice(1, -1), slice(2, None))
for ix, jx in itertools.product(slice_tup, slice_tup):
if ix == jx == slice(1, -1):
continue
matrix_[ix, jx] += _matrix
matrix_ = matrix_[1:-1, 1:-1]
_matrix[matrix_ == 3] = 1 # 新たな生命の誕生
matrix_[matrix_ == 2] = 3 # 生命の存続
_matrix[matrix_ != 3] = 0 # 生命の死滅
self.matrix_li.append(_matrix)
if __name__ == '__main__':
lg = LifeGame()
lg.set_matrix(shape=(40, 120))
lg.fn(limit=5000)
print(f'{len(lg.matrix_li)=}')
lg.save_gif()
for文のところでitertools.product
を使っているのは、多重ループとか見ていて気持ち悪いからです。こっちの方が、確かに実行速度は1.6倍くらい遅くなってしまいますが、プログラムも恋人もきれいなほうが良いに決まってる。
とは言っても、どこまでトレードオフかということを考えるとSciPyのsignal.correlate2d(matrix, mask)
は速度が桁外れに遅いので論外。だったら多次元テンソル畳み込み演算なんて自分で実装するわ。
ちなみにMacでgifを再生するには、実ファイルにカーソルを合わせてスペースキーを長押しすればできます(例が小さいのは勘弁)
さいごに
ライフゲームを含めて、シミュレーション論で解説されたモデルをとりあえずひとまずプログラムとして組んでみました。
大変な部分もありましたが、 Google先生の力を借りてなんとか実装できました。
「実践形式の講義にしてほしい」という受動的な態度ではここまでできなかったと思います。
やっぱり勉強した内容を身につけるには、インタラクティブにコンストラクティブに(意識高い系が使いそうなカタカナ語笑)、頼まれもしないのに勝手にプログラムを組んでしまうことだ、と思いました。
結局それが開発経験とかに役立つのだから(インターンをしながらそう思った)
後日談
「インデックスを保存する方式ならメモリー消費を抑えることができるよ」というとある方からの提案で、とりあえずインデックス法(と勝手に命名する)で組んでみました。
import random
import itertools
import collections
from PIL import Image
import numpy as np
class MemoryCorruptionResistantLifeGameError(Exception):
pass
class MemoryCorruptionResistantLifeGame(object):
def __init__(self):
self.history = []
def set_ix_li(
self,
rows : 'row length' and int=8,
cols : 'col length' and int=8,
population: 'population of living people' and int=None,
ix_li : 'index of habitat' and [(int, int), ...]=None
) ->'index of habitat' and [(int, int), ...]:
self.rows, self.cols = rows, cols
if ix_li is None:
population = population if population is not None or (type(population) is int and population > 0) else random.randrange(rows * cols)
ix_li = [divmod(elem, rows)[::-1] for elem in random.sample(range(rows * cols), population)]
elif max(ix[0] for ix in ix_li) >= rows or max(ix[1] for ix in ix_li) >= cols:
raise Exception(f'matrix size must be {(rows, cols)=}')
ix_li.sort()
self.history.append(ix_li)
return ix_li
def set_history(
self,
limit: '' = None
):
def set_next_history():
_ix_li = self.history[-1]
ix_li = []
for r, c in _ix_li:
ix_li += [tuple(ix) for ix in itertools.product(range(max(0, r-1), min(r+2, self.rows)), range(max(0, c-1), min(c+2, self.cols)))]
counter = collections.Counter(ix_li)
for ix in _ix_li:
counter[ix] -= 1
self.history.append(sorted([ix for ix in _ix_li if counter[ix] == 2] + [ix for ix in counter if counter[ix] == 3]))
if limit is None:
while not (len(last_history := self.history[-1]) == 0 or last_history in self.history[:-1]):
set_next_history()
elif type(limit) is int and limit > 0:
for _ in range(limit):
set_next_history()
else:
raise Exception(f'"limit" must be None or int more than 0. <{limit=}>')
def save_gif(
self,
fname: 'filename for output' and str ='output.gif',
ppc : 'pixcel per cell' and int = 2,
fps : 'frame per sec' and float=30,
loop : 'loop times of gif' and int = 0
):
ppc = ppc if type(ppc) is int and ppc > 0 else 1
im_li = []
query = lambda matrix: matrix.repeat(ppc, axis=0).repeat(ppc, axis=1)
for part_history in self.history:
matrix = np.zeros((self.rows, self.cols), dtype=np.uint8)
matrix[tuple(zip(*part_history))] -= 1
im_li.append(Image.fromarray(query(matrix)))
im_li[0].save(
fname,
save_all =True,
append_images=im_li[1:],
optimze =True,
duration =1000 / fps,
loop =max(0, loop)
)
if __name__ == '__main__':
lg = MemoryCorruptionResistantLifeGame()
lg.set_ix_li(rows=120, cols=360)
lg.set_history()
print(len(lg.history))
lg.save_gif()
プログラムを動かしてみるとわかると思いますが、計算速度としては2番目よりも遅いです。
というのも、Python3ではfor文が遅いので、しかも2番目のはNumPyでC言語依存であるということもあって、そのようになってしまいます。
それでもメモリー消費を抑えられているので、やろうと思えばチューリングマシンになります。
私の研究したい分野は自然言語処理なのでさすがに組みたくはないですが、もし数理解析系や最適化系を専門として研究している人は、チューリングマシンをライフゲームから作るというのも悪くないのでは? と思いました()
やっぱりPythonistaはブロードキャスト使うのが通(つう)かな。
- 前の記事
Swift5でアプリを作った話(講義課題提出用) 2020.01.24
- 次の記事
あのさ、時代はJuliaだよ? 2020.02.02