diff --git a/bruteforce.py b/bruteforce.py index 5d3e9af..d8bca79 100644 --- a/bruteforce.py +++ b/bruteforce.py @@ -1,123 +1,173 @@ from itertools import product +import multiprocessing +import time from functools import lru_cache # For memoization # Define the area size for each gate type +amount_of_processes = 27 #how many processors to assign. I recommend 3 or 9 as there are 9 gates and the search space is created by dividing the top level of gates therefore divide 9 by the amount of processors + @lru_cache(None) def evaluate_circuit(circuit, inputs): intermediate_outputs = list(inputs[:]) for gate, in1, in2 in circuit: intermediate_out1 = intermediate_outputs[in1] intermediate_out2 = intermediate_outputs[in2] if gate == 'AND': output = intermediate_out1 & intermediate_out2 elif gate == 'OR': output = intermediate_out1 | intermediate_out2 elif gate == 'XOR': output = intermediate_out1 ^ intermediate_out2 elif gate == 'XNOR': output = ~(intermediate_out1 ^ intermediate_out2) & 1 elif gate == 'NAND': output = ~(intermediate_out1 & intermediate_out2) & 1 elif gate == 'NOR': output = ~(intermediate_out1 | intermediate_out2) & 1 elif gate == 'INV': output = ~intermediate_out1 & 1 elif gate == 'ANDNOT': output = intermediate_out1 & (~intermediate_out2 & 1) elif gate == 'ORNOT': output = intermediate_out1 | (~intermediate_out2 & 1) # & 1 to ensure output is 0 or 1 else it will be -1 else: output = 0 intermediate_outputs.append(output) return intermediate_outputs[-1] def brute_force_boolean(target_function, num_inputs, max_gates, max_area): truth_table = list(product([0, 1], repeat=num_inputs)) target_outputs = [target_function(input_comb) for input_comb in truth_table] print("Truth Table and Target Outputs:") print("Inputs\t\tOutput") for inputs, output in zip(truth_table, target_outputs): print(f"{inputs}\t{output}") gate_types = { 'INV': 1160, 'AND': 3472, 'NAND': 2832, 'OR': 3472, 'NOR': 2832, 'XOR': 4632, 'XNOR': 4632, 'ANDNOT': 3472, 'ORNOT': 3472 } best_circuit = None best_area = max_area - - i = 0 for num_gates in range(1, max_gates + 1): print("num_gates: ", num_gates, " building circuits") - for circuit in generate_all_circuits(num_inputs, num_gates, gate_types, best_area): - total_area = sum(gate_types[gate] for gate, _, _ in circuit) - if(i % 100000 == 0): - print(f"Circuit: {i:,} at num_gates: {num_gates} with area: {total_area}") - i += 1 - if total_area > best_area: - continue - if all(evaluate_circuit(tuple(circuit), inputs) == target_output - for inputs, target_output in zip(truth_table, target_outputs)): - if not best_circuit or total_area < best_area: + tic = time.perf_counter() + with multiprocessing.Pool(processes=amount_of_processes) as pool: + results = [] + + for worker_id in range(amount_of_processes): + results.append(pool.apply_async(search_space_worker, (num_inputs, num_gates, gate_types, best_area, truth_table, target_outputs, worker_id))) + + + + for result in results: + circuit, area = result.get() + if circuit and area < best_area: best_circuit = circuit - best_area = total_area - print("New best circuit found with area", best_area, ":", best_circuit) - else: - print("Circuit with area", total_area, ":") - for gate, in1, in2 in circuit: - print(f" Gate: {gate}, Input1: {in1}, Input2: {in2}") - + best_area = area + + toc = time.perf_counter() + print(f"all processes ran together for {toc - tic:0.4f} seconds") + + return best_circuit, best_area + +def search_space_worker(num_inputs, num_gates, gate_types, best_area, truth_table, target_outputs, worker_id): + tic = time.perf_counter() + + best_circuit = None + i = 1 + + tic2 = time.perf_counter() + for circuit in generate_all_circuits(num_inputs, num_gates, gate_types, best_area, worker_id): + total_area = sum(gate_types[gate] for gate, _, _ in circuit) + if(i % 100000 == 0): + print(f"Circuit: {i:,} at num_gates: {num_gates} with area: {total_area} in process: {worker_id} with last gate: {circuit[-1]} and total time taken: {time.perf_counter() - tic2:0.4f} seconds") + tic2 = time.perf_counter() + i += 1 + if total_area > best_area: + continue + if all(evaluate_circuit(tuple(circuit), inputs) == target_output + for inputs, target_output in zip(truth_table, target_outputs)): + if not best_circuit or total_area < best_area: + best_circuit = circuit + best_area = total_area + print("New best circuit found with area", best_area, ":", best_circuit) + else: + print("Circuit with area", total_area, " on process ",worker_id,":") + for gate, in1, in2 in circuit: + print(f" Gate: {gate}, Input1: {in1}, Input2: {in2}") + + + toc = time.perf_counter() + print(f"processes { worker_id } ran for {toc - tic:0.4f} seconds meaning {((toc - tic)/i):0.4f} seconds per circuit with {i} circuits best area: {best_area}") + return best_circuit, best_area -def generate_all_circuits(num_inputs, num_gates, gate_types, best_area): +def generate_all_circuits(num_inputs, num_gates, gate_types, best_area, worker_id): """ circuit: [(gate, in1, in2), ...] where gate is one of the gate types and in1, in2 are indices of the inputs or intermediate outputs """ base_indices = list(range(num_inputs)) - def recursive_build(current_circuit, available_indices, current_area, depth): + def recursive_build(current_circuit, available_indices, current_area, depth, worker_id): if depth == num_gates: outputs_used = set() for _, in1, in2 in current_circuit: outputs_used.add(in1) outputs_used.add(in2) all_outputs_used = True for i in range(len(current_circuit)-1): if i + num_inputs not in outputs_used: all_outputs_used = False break if all_outputs_used or num_gates == 1: yield current_circuit return for gate, area in gate_types.items(): + if amount_of_processes <= 9: + #less then 9 processes so split the set on outer gate type + outer_subset = list(gate_types.keys())[worker_id::amount_of_processes] + else: + #more than 9 processes so split on outer and divide second to outer layer. + secondouter_subset = list(gate_types.keys())[worker_id % 3::3] + outer_subset = list(gate_types.keys())[worker_id // 3::9] + + if(depth+2) == num_gates and gate not in secondouter_subset: + #print("skipping gate: ", gate, "on process: ", worker_id) + continue + + if (depth+1) == num_gates and gate not in outer_subset: + #print("skipping gate: ", gate, "on process: ", worker_id) + continue + if current_area + area > best_area: continue for in1 in available_indices: for in2 in available_indices: #skip redundant gate if it is an INV with different inputs, or if it is a gate with the same, or if it is symmetric on a normal gate if (in1 == in2 and gate != 'INV') or (in1 != in2 and gate == 'INV') or (in1 > in2 and (gate != 'ANDNOT' or gate != 'ORNOT')): # Skip redundant gates continue new_circuit = current_circuit + [(gate, in1, in2)] new_indices = available_indices + [len(available_indices)] - yield from recursive_build(new_circuit, new_indices, current_area + area, depth + 1) + yield from recursive_build(new_circuit, new_indices, current_area + area, depth + 1, worker_id) - return recursive_build([], base_indices, 0, 0) + return recursive_build([], base_indices, 0, 0, worker_id) # Example: Target function for 2-input XOR -target_function = lambda x: x[0] ^ x[1] ^ x[2] -best_circuit, best_area = brute_force_boolean(target_function, num_inputs=3, max_gates=6, max_area=100000000) +target_function = lambda x: x[0] ^ x[1] ^ x[2] ^ x[3] +best_circuit, best_area = brute_force_boolean(target_function, num_inputs=4, max_gates=8, max_area=100000000) print("Best circuit:", best_circuit, "with area: ", best_area)