FixedPoint#

Z3のFixedpointは、再帰的な論理関係を解くためのエンジンであり、特にデータフロー解析や到達可能性解析(reachability analysis)などの問題に適用されます。Fixedpointは、再帰的な述語を扱うことができ、プログラムの性質検証、モデル検査、関係データベースのクエリ処理などに利用されます。

デフォルトのFixedpointエンジンは、ボトムアップ型のDatalogエンジンです。Datalogエンジンとは、関係データベースの問い合わせや論理推論を行うためのエンジンです。主に 関係データの推論や再帰的なクエリ処理 に利用されます。述語論理をベースにしたデータ操作が可能であり、データベースの問い合わせ(SQLの再帰的クエリに相当)に似た動作をします。

グラフの到達可能性#

以下の例では、Datalogエンジンを使用してグラフの到達可能性を求めます。対象となるのは、以下の無向グラフです。頂点1から頂点6へ到達できるかどうかを調べ、到達可能な場合はその経路を出力します。

        flowchart LR
1 --- 2
1 --- 5
5 --- 2
4 --- 5
2 --- 3
4 --- 3
4 --- 6
    
from z3 import *

次のコードは、Z3のDatalogエンジンを使って無向グラフの到達可能性を判定するものです。

fp = Fixedpoint()
fp.set(engine='datalog')
fp.set('datalog.generate_explanations', True)
s = BitVecSort(8)

connect = Function('connect', s, s, BoolSort())
edge = Function('edge', s, s, BoolSort())
path = Function('path', s, s, BoolSort())

a = Const('a', s)
b = Const('b', s)
c = Const('c', s)

fp.register_relation(path, edge, connect)

fp.declare_var(a, b, c)
fp.rule(path(a, b), connect(a, b))
fp.rule(path(a, c), [connect(a, b), path(b, c)])

fp.rule(connect(a, b), edge(a, b))
fp.rule(connect(a, b), edge(b, a))

fp.fact(edge(1, 2))
fp.fact(edge(1, 5))
fp.fact(edge(5, 2))
fp.fact(edge(4, 5))
fp.fact(edge(2, 3))
fp.fact(edge(4, 3))
fp.fact(edge(4, 6))

まず、次のコードでFixedpointエンジンの設定を行います。

fp = Fixedpoint()
fp.set(engine='datalog')  
fp.set('datalog.generate_explanations', True)
  • engine='datalog':Datalogエンジンを使用

  • datalog.generate_explanations = True:推論の説明(explanations)を取得可能にします。この説明から到達ルートを抽出することができます。


次に、8ビットのビットベクトル型 BitVecSort(8)を定義し、頂点の識別子のデータ型として使用します。

s = BitVecSort(8)

次に、グラフの関係(述語)を定義します。

connect = Function('connect', s, s, BoolSort())
edge = Function('edge', s, s, BoolSort())
path = Function('path', s, s, BoolSort())
  • edge(a, b):頂点 a から b へ直接つながっている(辺がある)

  • connect(a, b)a から b へ移動可能(無向グラフのため双方向を考慮)

  • path(a, b)a から b へ到達可能な場合に成り立つ(再帰的に探索)


次に、Datalogルールを定義するときに使用する変数を宣言します。

a = Const('a', s)
b = Const('b', s)
c = Const('c', s)

Fixedpointエンジンに述語を登録し、変数を宣言します。

fp.register_relation(path, edge, connect)
fp.declare_var(a, b, c)

次に、推論ルールを定義します。

fp.rule(path(a, b), connect(a, b))
fp.rule(path(a, c), [connect(a, b), path(b, c)])
  • a から b へ 直接つながっていれば(connect(a, b))、a から b へ到達可能(path(a, b)

  • a から b へ移動でき、さらに b から c へ到達可能なら、a から c へも到達可能

fp.rule(connect(a, b), edge(a, b))
fp.rule(connect(a, b), edge(b, a))
  • a から b に直接つながっているなら(edge(a, b))、移動可能(connect(a, b)

  • 無向グラフなので、b から a に直接つながっている場合も移動可能とする


次に、グラフの辺(edge)を事実(fact)として登録します。

fp.fact(edge(1, 2))
fp.fact(edge(1, 5))
fp.fact(edge(5, 2))
fp.fact(edge(4, 5))
fp.fact(edge(2, 3))
fp.fact(edge(4, 3))
fp.fact(edge(4, 6))

次のコードで、頂点 1 から 頂点 6 へ到達可能かどうかを問い合わせ(クエリ)し、結果を取得・出力します。

print(fp.query(path(1, 6)))
ans = fp.get_answer()
print(ans)
sat
And(Var(0) ==
    query!6() :-  path(#x01,#x06).(path(#2,#0) :-  path(#1,#0), connect(#2,#1).(path(#2,#0) :-  path(#1,#0), connect(#2,#1).(path(#1,#0) :-  connect(#1,#0).(connect(#1,#0) :-  edge(#1,#0).(edge(#x04,#x06).)),
                                        connect(#1,#0) :-  edge(#0,#1).(edge(#x04,#x05).)),
                                        connect(#1,#0) :-  edge(#1,#0).(edge(#x01,#x05).))))

ans の結果は木構造になっており、各ノードはchildren()を使って、その子ノードを取得できます。次の iter_all_children() 関数は、 木構造内のすべてのノードを再帰的に取得する関数です。この関数を利用して、名前がedgeで始まるノードを抽出し、逆順で出力します。これにより、頂点 1 から 頂点 6 までの経路を取得できます。

def iter_all_children(ans):
    yield ans
    for child in ans.children():
        yield from iter_all_children(child)

for node in list(iter_all_children(ans))[::-1]:
    if str(node).startswith("edge"):
        print(node)
edge(#x01,#x05).
edge(#x04,#x05).
edge(#x04,#x06).

また、頂点1から頂点7への経路を調べた結果、unsat(解なし)が得られ、経路が存在しないことが確認されました。

fp.query(path(1, 7))
unsat

3つのバケツ問題#

3つのバケツ問題とは、3リットル、5リットル、8リットルのバケツを使って、指定された量の水を測る問題です。初期状態では、8リットルのバケツに水が一杯入っており、他の2つのバケツは空です。目的は、2つのバケツにそれぞれ4リットルずつ分けることです。

この問題は、有向辺を持つグラフの到達可能性問題に変換できます。グラフの頂点は3つのバケツの水の状態を表し、有向辺はその状態から別の状態に移行できるかどうかを示します。

次のコードでは、すべての可能な状態 states を計算し、二つの状態の間で直接移行できる場合、それを辺として transitions に追加します。

from itertools import product

states = []
sizes = (8, 5, 3)
for a, b, c in product(*[range(size + 1) for size in sizes]):
    if a + b + c == 8:
        states.append((a, b, c))

transitions = []
for s1, s2 in product(states, states):
    buckets = [(v, v2) for (v, v1, v2) in zip(sizes, s1, s2) if v1 != v2]
    if len(buckets) != 2:
        continue
    if any([v1 == v2 or v2 == 0 for v1, v2 in buckets]):
        transitions.append((s1, s2))

次にFixedpointを使って到達ルートを探します。基本的な考え方は前と同じです。

  • pour述語:水を1つのバケツから別のバケツに移動する操作を表す述語(関数)を定義します。pour(a, b, c, a2, b2, c2)は、バケツの状態a, b, cから状態a2, b2, c2に移行できるかどうかを判定します。

  • plan述語:水を移動する計画を表す述語(関数)で、指定された初期状態から目標状態に至るまでの移動計画が存在するかを調べます。

pourを事実として登録する際、その事実を代表する文字列を指定することで、後で抽出しやすくなります。

fp = Fixedpoint()
fp.set(engine='datalog')
fp.set('datalog.generate_explanations', True)
s = BitVecSort(5)
pour = Function('pour', s, s, s, s, s, s, BoolSort())
plan = Function('plan', s, s, s, s, s, s, BoolSort())
a, b, c, a1, b1, c1, a2, b2, c2 = Consts(['a', 'b', 'c', 'a1', 'b1', 'c1', 'a2', 'b2', 'c2'], s)
fp.register_relation(pour, plan)
fp.declare_var(a, b, c, a1, b1, c1, a2, b2, c2)

fp.rule(
    plan(a, b, c, a2, b2, c2),
    pour(a, b, c, a2, b2, c2)
)

fp.rule(
    plan(a, b, c, a2, b2, c2),
    [pour(a, b, c, a1, b1, c1), plan(a1, b1, c1, a2, b2, c2)]
)

for (a1, b1, c1), (a2, b2, c2) in transitions:
    fp.fact(pour(a1, b1, c1, a2, b2, c2), f'pour: {a1}, {b1}, {c1} -> {a2}, {b2}, {c2}')

%time print(fp.query(plan(8, 0, 0, 4, 4, 0)))
ans = fp.get_answer()
sat
CPU times: total: 31.2 ms
Wall time: 32 ms

次に作成したルールと事実を確認します。

for rule in fp.get_rules()[:5]:
    print(rule)
print("...")
ForAll([A, B, C, D, E, F],
       Implies(pour(A, B, C, D, E, F),
               plan(A, B, C, D, E, F)))
ForAll([A, B, C, D, E, F, G, H, I],
       Implies(And(plan(D, E, F, G, H, I),
                   pour(A, B, C, D, E, F)),
               plan(A, B, C, G, H, I)))
pour(0, 5, 3, 3, 5, 0)
pour(0, 5, 3, 5, 0, 3)
pour(1, 4, 3, 0, 5, 3)
...

次に、pour:から始まるノードを抽出し、逆順で表示します。これが初期状態から目標状態までのルートになります。

steps = []
for node in iter_all_children(ans):
    if str(node).startswith("pour:"):
        steps.append(node)

for step in steps[::-1]:
    print(step)
pour: 8, 0, 0 -> 3, 5, 0
pour: 3, 5, 0 -> 3, 2, 3
pour: 3, 2, 3 -> 6, 2, 0
pour: 6, 2, 0 -> 6, 0, 2
pour: 6, 0, 2 -> 1, 5, 2
pour: 1, 5, 2 -> 1, 4, 3
pour: 1, 4, 3 -> 4, 4, 0

pour述語をルールにする#

上の節では、Pythonのコードですべての辺を計算し、pour述語を事実として登録しました。今回はpour述語をルールとして登録し、state述語を事実として登録します。

fp = Fixedpoint()
fp.set(engine='datalog')
fp.set('datalog.generate_explanations', True)

s = BitVecSort(5)
state = Function('state', s, s, s, BoolSort())
pour = Function('pour', s, s, s, s, s, s, BoolSort())
plan = Function('plan', s, s, s, s, s, s, BoolSort())
a, b, c, a1, b1, c1, a2, b2, c2 = Consts(['a', 'b', 'c', 'a1', 'b1', 'c1', 'a2', 'b2', 'c2'], s)
fp.register_relation(state, pour, plan)
fp.declare_var(a, b, c, a1, b1, c1, a2, b2, c2)

fp.rule(
    plan(a, b, c, a2, b2, c2),
    pour(a, b, c, a2, b2, c2)
)

fp.rule(
    plan(a, b, c, a2, b2, c2),
    [pour(a, b, c, a1, b1, c1), plan(a1, b1, c1, a2, b2, c2)]
)

fp.rule(
    pour(a, b, c, a1, b1, c1),
    [
        state(a, b, c),
        state(a1, b1, c1),
         Or(
             And(a == a1, b != b1, Or(b1 == 0, b1 == 5, c1 == 0, c1 == 3)),
             And(b == b1, a != a1, Or(a1 == 0, a1 == 8, c1 == 0, c1 == 3)),
             And(c == c1, a != a1, Or(a1 == 0, a1 == 8, b1 == 0, b1 == 5)),
         )
    ], 
)

for a, b, c in product(range(9), range(6), range(4)):
    if a + b + c == 8:
        fp.fact(state(a, b, c), f'{a}, {b}, {c}')

%time print(fp.query(plan(8, 0, 0, 4, 4, 0)))
ans = fp.get_answer()
sat
CPU times: total: 438 ms
Wall time: 460 ms
steps = []
for node in iter_all_children(ans):
    if str(node).startswith("pour("):
        steps.append(f'{node.arg(0)} -> {node.arg(1)}')

for step in steps[::-1]:
    print(step)
8, 0, 0 -> 3, 5, 0
3, 5, 0 -> 3, 2, 3
3, 2, 3 -> 6, 2, 0
6, 2, 0 -> 6, 0, 2
6, 0, 2 -> 1, 5, 2
1, 5, 2 -> 1, 4, 3
1, 4, 3 -> 4, 4, 0

次はstate述語を宣言します。state(a, b, c) は、バケツ a, b, c の水の状態を表します。

s = BitVecSort(5)
state = Function('state', s, s, s, BoolSort())

次はpour述語のルールです。このルールでは、state(a, b, c)state(a1, b1, c1) の間で、どのバケツからどのバケツへ水を移動できるかを定義しています。移動可能な条件として、特定のバケツに水を移動できるかどうかを OrAnd を使って指定しています。

fp.rule(
   pour(a, b, c, a1, b1, c1),
   [
       state(a, b, c),
       state(a1, b1, c1),
       Or(
           And(a == a1, b != b1, Or(b1 == 0, b1 == 5, c1 == 0, c1 == 3)),
           And(b == b1, a != a1, Or(a1 == 0, a1 == 8, c1 == 0, c1 == 3)),
           And(c == c1, a != a1, Or(a1 == 0, a1 == 8, b1 == 0, b1 == 5)),
       )
   ],
)

最後に、stateを事実として登録します。水の量が合計8リットルである状態をすべて列挙します。

for a, b, c in product(range(9), range(6), range(4)):
   if a + b + c == 8:
       fp.fact(state(a, b, c), f'{a}, {b}, {c}')

state述語をルールにする#

次のコードでは、state述語もルールとして定義します。この場合、bucket_abucket_bbucket_cの三つの述語を定義し、これらを使って各バケツに可能な水の量を事実として登録します。

fp = Fixedpoint()
fp.set(engine='datalog')
fp.set('datalog.generate_explanations', True)
s = BitVecSort(5)

bucket_a = Function('bucket_a', s, BoolSort())
bucket_b = Function('bucket_b', s, BoolSort())
bucket_c = Function('bucket_c', s, BoolSort())
state = Function('state', s, s, s, BoolSort())
pour = Function('pour', s, s, s, s, s, s, BoolSort())
plan = Function('plan', s, s, s, s, s, s, BoolSort())
a, b, c, a1, b1, c1, a2, b2, c2 = Consts(['a', 'b', 'c', 'a1', 'b1', 'c1', 'a2', 'b2', 'c2'], s)
fp.register_relation(state, pour, plan, bucket_a, bucket_b, bucket_c)
fp.declare_var(a, b, c, a1, b1, c1, a2, b2, c2)

fp.rule(
    plan(a, b, c, a2, b2, c2),
    pour(a, b, c, a2, b2, c2)
)

fp.rule(
    plan(a, b, c, a2, b2, c2),
    [pour(a, b, c, a1, b1, c1), plan(a1, b1, c1, a2, b2, c2)]
)

fp.rule(
    pour(a, b, c, a1, b1, c1),
    [
        state(a, b, c),
        state(a1, b1, c1),
         Or(
             And(a == a1, b != b1, Or(b1 == 0, b1 == 5, c1 == 0, c1 == 3)),
             And(b == b1, a != a1, Or(a1 == 0, a1 == 8, c1 == 0, c1 == 3)),
             And(c == c1, a != a1, Or(a1 == 0, a1 == 8, b1 == 0, b1 == 5)),
         )
    ], 
)

fp.rule(
    state(a, b, c),
    [bucket_a(a), bucket_b(b), bucket_c(c), a + b + c == 8],
)

for i in range(sizes[0] + 1):
    fp.fact(bucket_a(i), str(i))

for i in range(sizes[1] + 1):
    fp.fact(bucket_b(i), str(i))

for i in range(sizes[2] + 1):
    fp.fact(bucket_c(i), str(i))

%time print(fp.query(plan(8, 0, 0, 4, 4, 0)))
ans = fp.get_answer()
sat
CPU times: total: 5.97 s
Wall time: 6.31 s

同じ結果が得られますが、かなり時間がかかりました。原因は、state述語のルールを追加したことで、計算の対象となる状態の数が増えたため、探索すべき状態が増加したことです。そのため、計算に時間がかかるようになりました。

steps = []
for node in iter_all_children(ans):
    if str(node).startswith("pour("):
        node1 = ", ".join(map(str, node.arg(0).children()))
        node2 = ", ".join(map(str, node.arg(1).children()))
        steps.append(f"{node1} -> {node2}")

for step in steps[::-1]:
    print(step)
8, 0, 0 -> 3, 5, 0
3, 5, 0 -> 3, 2, 3
3, 2, 3 -> 6, 2, 0
6, 2, 0 -> 6, 0, 2
6, 0, 2 -> 1, 5, 2
1, 5, 2 -> 1, 4, 3
1, 4, 3 -> 4, 4, 0