Remove random python scripts

This commit is contained in:
Ivan Malison 2021-08-01 01:04:45 -06:00
parent cce7cb154e
commit 8a3457a28c
No known key found for this signature in database
GPG Key ID: 62530EFBE99DC2F8
59 changed files with 0 additions and 4227 deletions

View File

@ -1,69 +0,0 @@
def memoize(f):
memo = {}
def helper(*args):
args_tuple = tuple(args)
if args_tuple not in memo:
memo[args_tuple] = f(*args)
return memo[args_tuple]
return helper
def can_i_win(max_value, available_numbers):
if not isinstance(available_numbers, frozenset):
available_numbers = frozenset(available_numbers)
return _can_i_win(max_value, available_numbers, 0)
@memoize
def _can_i_win(max_value, available_numbers, current_value):
for number in available_numbers:
new_value = current_value + number
if new_value > max_value:
continue
new_numbers = available_numbers - frozenset([number])
can_win, _ = _can_i_win(max_value, new_numbers, new_value)
if not can_win:
return (True, number)
return (False, list(iter(available_numbers))[0])
def play_game(max_value, max_number, computer_parity=0):
available_numbers = frozenset(range(max_number))
move_count = 0
count = 0
while True:
print("Count is {0}, numbers are {1}, max is {2}".format(
count, available_numbers, max_value
))
computer_turn = move_count % 2 == computer_parity
if move_count % 2 == computer_parity:
can_win, number = _can_i_win(max_value, available_numbers, count)
print("Computer thinks it can win: {0}".format(can_win))
else:
number = get_valid_selection(available_numbers)
count += number
available_numbers -= frozenset([number])
move_count += 1
print("{0} selected, count is now {1}".format(number, count))
if count > max_value:
if computer_turn:
print("You win")
else:
print("You lose")
return
print("------------------------------------------------------")
def get_valid_selection(valid_numbers):
while True:
number = int(input(
"Enter one of the following numbers {0}:\n".format(valid_numbers)
))
if number in valid_numbers:
return number
if __name__ == '__main__':
play_game(14, 7, computer_parity=1)

View File

@ -1,43 +0,0 @@
def count_unique_sums2(number, maximum_size):
maximum_to_try = min(number, maximum_size) + 1
the_range = list(range(1, maximum_to_try))
all_sums = []
for max_in_sum in the_range:
if max_in_sum == number:
all_sums.extend([[number]])
continue
new_sums = count_unique_sums(number-max_in_sum, max_in_sum)
all_sums.extend(
[
sum_so_far + [max_in_sum]
for sum_so_far in new_sums
]
)
return all_sums
unique_sum_counts = {}
def count_unique_sums(number, maximum_size):
if (number, maximum_size) in unique_sum_counts:
return unique_sum_counts[(number, maximum_size)]
maximum_to_try = min(number, maximum_size) + 1
the_range = list(range(1, maximum_to_try))
sum_count = 0
for max_in_sum in the_range:
if max_in_sum == number:
sum_count += 1
continue
sum_count += count_unique_sums(number-max_in_sum, max_in_sum)
unique_sum_counts[(number, maximum_size)] = sum_count
return sum_count
if __name__ == '__main__':
print(count_unique_sums(100, 100))

View File

@ -1,73 +0,0 @@
import random
import math
class BirthdayProblem(object):
def __init__(self):
pass
def birthday_problem(problem_size=365):
birthdays = set()
while True:
new_birthday = random.randint(1, problem_size)
if new_birthday in birthdays:
return len(birthdays) + 1
birthdays.add(new_birthday)
def theoretical_average(problem_size):
probabilities = []
contributions = []
for n in range(1, problem_size):
probability = (float(n-1) / problem_size) * falling_factorial_over_exponentiation(problem_size, n-1)
contribution = n * probability
probabilities.append(probability)
contributions.append(contribution)
return sum(contributions)
def falling_factorial(n, k):
product = 1
while k > 0:
product *= n
n -= 1
k -= 1
return product
def falling_factorial_over_exponentiation(n, k):
orig = n
product = float(1)
while k > 0:
product *= n
product = product/orig
n -= 1
k -= 1
return product
def run_birthday_problem_n_times(times_to_run, problem_size=365):
return [birthday_problem(problem_size) for i in range(int(times_to_run))]
def number_of_people_to_times_occured(runs):
number_of_people_to_times_occured = {}
for run in runs:
number_of_people_to_times_occured[run] = number_of_people_to_times_occured.get(run, 0) + 1
if __name__ == '__main__':
times_to_run = 131072
while times_to_run <= 131072:
for problem_size in range(4000, 5000, 100):
average = sum(run_birthday_problem_n_times(times_to_run, problem_size=problem_size))/float(times_to_run)
print "problem size {3} ran {0} times, average was {1}, theoretical average is {2}".format(
times_to_run,
average,
theoretical_average(problem_size),
problem_size
)
print math.fabs(average - theoretical_average(problem_size))
times_to_run *= 2

View File

@ -1,123 +0,0 @@
import random
import enum
import itertools
class Suit(enum.Enum):
CLUBS = 0
DIAMONDS = 1
HEARTS = 2
SPADES = 3
class Deck(object):
@classmethod
def random_deck(cls):
return cls(generate_cards())
def __init__(self, cards):
self._cards = cards
self.top = 0
def pop_top(self):
if self.top >= 52:
raise Exception()
card = get_card(self._cards[self.top])
self.top += 1
return card
def get_card(card_number):
return (Suit(card_number // 13), card_number % 13)
def random_permutation_of_size(n):
remaining = list(range(n))
for i in range(n-1, -1, -1):
yield remaining.pop(random.randint(0, i))
def random_permutation(the_list):
for index in random_permutation_of_size(len(the_list)):
yield the_list[index]
def generate_cards(num_cards=52):
return list(random_permutation(range(num_cards)))
def card_value(card_number):
if card_number >= 10:
return 10
return card_number + 1
def card_string(card_number):
if card_number == 12:
return 'K'
elif card_number == 11:
return 'Q'
elif card_number == 10:
return 'J'
return str(card_number + 1)
def get_hand_value(hand):
number_of_aces = 0
total_value = 0
for _, card_number in hand:
if card_number == 0:
number_of_aces += 1
else:
total_value += card_value(card_number)
while total_value < 10 - (number_of_aces - 1):
total_value += 11
number_of_aces -= 1
total_value += number_aces
return total_value
class Blackjack(object):
def __init__(self, deck=None):
self._deck = deck or Deck.random_deck()
self.initialize_game()
def initialize_game(self):
self.dealer_hand = [self._deck.pop_top() for _ in range(2)]
self.player_hand = [self._deck.pop_top() for _ in range(2)]
def hit(self):
self.player_hand.append(self._deck.pop_top())
def run_dealer(self):
while get_hand_value(self.dealer_hand) < 17:
self.dealer_hand.append(self._deck.pop_top())
class UserHandler(object):
def __init__(self, game=None):
self._game = game or Blackjack()
def print_game_state(self):
print(self.game_string())
def game_string(self):
return "\n".join([
self.dealer_string(), self.hand_string(self._game.player_hand),
])
def dealer_string(self):
return "X {0}".format(self.hand_string(self._game.dealer_hand[1:]))
def hand_string(self, cards):
return " ".join(card_string(card[1]) for card in cards)
if __name__ == '__main__':
UserHandler().print_game_state()
the_cards = UserHandler()._game._deck._cards
print (the_cards)
print(set(the_cards) == set(range(52)))

View File

@ -1,168 +0,0 @@
#! /usr/bin/env python
import random
class TrieNode(object):
def __init__(self, children=None, is_word=False, word=''):
self.children = children or {}
self.is_word = is_word
self.word = word
def has_word(self, word):
node, rem = self.get(word)
if rem:
return False
return node.is_word
def get(self, suffix):
if not suffix:
return self, suffix
character = suffix[0]
if character in self.children:
node = self.children[character]
return node.get(suffix[1:])
return self, suffix
def add(self, word):
node, suffix = self.get(word)
if not suffix:
node.is_word = True
return node
character = suffix[0]
new_node = type(self)(word=node.word+character)
node.children[character] = new_node
new_node.add(suffix[1:])
return new_node
def build_board(size=4):
return [
[chr(random.randint(97, 97+25)) for i in range(size)]
for j in range(size)
]
def board_string(board_to_draw):
border_line = "+{0}+".format((len(board_to_draw[0]) * 2 - 1) * "-")
return "{border_line}\n{contents}\n{border_line}".format(
contents="\n".join("|{0}|".format(" ".join(letter_line))
for letter_line in board_to_draw),
border_line=border_line,
)
def build_trie():
node = TrieNode()
with open('/usr/share/dict/words') as the_file:
for word in the_file.readlines():
node.add(word.lower().strip())
return node
unit = (1, 0, -1)
class Boggle(object):
deltas = [(i, j) for i in unit for j in unit]
@classmethod
def new_random(cls, trie):
return cls(build_Board(), trie)
def __init__(self, board, trie):
self.height = len(board)
self.width = len(board[0])
self.board = board
self.trie = trie
def run(self):
for i in range(self.width):
for j in range(self.height):
for word in self.find(i, j):
if len(word) > 2:
yield word
def adjacency(self, i, j):
for i_d, j_d in self.deltas:
new_i = i_d + i
new_j = j_d + j
if 0 <= new_i < self.height and 0 <= new_j < self.width:
yield new_i, new_j
def find(self, i, j, trie=None, visited=None, current_word=''):
trie = trie or self.trie
visited = visited or set()
visited = set(visited)
characters = self.board[i][j]
visited.add((i, j))
new_trie = trie
for character in characters:
if character in new_trie.children:
new_trie = new_trie.children[character]
else:
break
else:
current_word += characters
new_trie = trie.children[character]
if new_trie.is_word:
yield current_word
for new_i, new_j in self.adjacency(i, j):
if (new_i, new_j) in visited:
continue
new_visited = set(visited)
for word in self.find(new_i, new_j, trie=new_trie,
visited=new_visited, current_word=current_word):
yield word
boggle_dice = [
["a", "a", 'e', 'e', 'g', 'n'],
["e", "l", 'r', 't', 't', 'y'],
["a", "o", 'o', 't', 't', 'w'],
["a", "b", 'b', 'j', 'o', 'o'],
["e", "h", 'r', 't', 'v', 'w'],
["c", "i", 'm', 'o', 't', 'u'],
["d", "i", 's', 't', 't', 'y'],
["e", "i", 'o', 's', 's', 't'],
["d", "e", 'l', 'r', 'v', 'y'],
["a", "c", 'h', 'o', 'p', 's'],
["h", "i", 'm', 'n', 'qu', 'u'],
["e", "e", 'i', 'n', 's', 'u'],
["e", "e", 'g', 'h', 'n', 'w'],
["a", "f", 'f', 'k', 'p', 's'],
["h", "l", 'n', 'n', 'r', 'z'],
["d", "e", 'i', 'l', 'r', 'x'],
]
def random_permutation_of_size(n):
remaining = list(range(n))
for i in range(n-1, -1, -1):
yield remaining.pop(random.randint(0, i))
def random_permutation(the_list):
for index in random_permutation_of_size(len(the_list)):
yield the_list[index]
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
def roll_dice(dice):
for die in random_permutation(dice):
yield random.choice(die)
def build_board_from_dice_roll(dice=None, row_size=4):
dice = dice or boggle_dice
return list(chunks(list(roll_dice(dice)), row_size))
if __name__ == '__main__':
dict_trie = build_trie()
board = build_board_from_dice_roll()
print(board_string(board))
print(list(Boggle(board, dict_trie).run()))

View File

@ -1,231 +0,0 @@
import bisect
class BTreePrinter(object):
number_width = 4
subtree_space = object()
def __init__(self, btree):
self.btree = btree
def determine_width_of_node(self, node):
if node is None: return 0
return sum(map(self.determine_width_of_node, node.nodes)) + node.number_of_value_nodes * self.number_width
def determine_width_of_value_node(self, value_node):
return self.determine_width_of_node(value_node.node) if value_node.node is not None else self.number_width
def print_tree(self):
return self.print_levels_recursively([self.btree.head])
def print_levels_recursively(self, level):
if all(map(lambda x: x is self.subtree_space, level)): return
self.print_nodes_at_level(level)
print ''
self.print_levels_recursively(self.get_next_level(level))
def get_next_level(self, level):
new_level = []
for item in level:
if item is self.subtree_space:
new_level.append(item)
elif item is not None:
new_level.extend(item.nodes)
new_level.append(self.subtree_space)
return new_level
def print_nodes_at_level(self, level):
for item in level:
if item is self.subtree_space:
print ' ' * self.number_width,
else:
self.print_values_for_node(item)
def print_values_for_node(self, node):
if node is None: return
for value_node in node.value_nodes:
print ' ' * self.determine_width_of_node(value_node.node),
print '{num: ^{width}}'.format(num=value_node.value, width=self.number_width),
print (' ' * (self.determine_width_of_node(node.rightmost_node))),
class IntegrityChecker(object):
def __init__(self, btree):
self.btree = btree
def check_integrity(self):
return self.check_for_items_smaller_in_right_subtree(self.btree.head) and self.check_for_unmatched_parents(self.btree.head)
def check_for_unmatched_parents(self, subtree):
if subtree is None:
return True
for node in subtree.nodes:
if node is None:
continue
if node.parent is not subtree:
return False
if not self.check_for_unmatched_parents(node):
return False
return True
def check_for_items_smaller_in_right_subtree(self, subtree):
if subtree is None:
return True
small_value = subtree.value_nodes[0].value
for value_node in subtree.value_nodes[1:]:
if not self.check_subtree_has_no_items_smaller_than(value_node.node, small_value):
return False
if not self.check_for_items_smaller_in_right_subtree(subtree.value_nodes[0].node):
return False
return self.check_subtree_has_no_items_smaller_than(subtree.rightmost_node, small_value)
def check_subtree_has_no_items_smaller_than(self, subtree, value):
if subtree is None:
return True
for value_node in subtree.value_nodes:
if value > value_node.value:
return False
if not self.check_subtree_has_no_items_smaller_than(value_node.node, value):
return False
return self.check_subtree_has_no_items_smaller_than(subtree.rightmost_node, value)
class BTree(object):
@classmethod
def build_with_value(cls, value):
btree = cls()
btree.head = Node(btree, [ValueNode(value)])
return btree
def __init__(self):
self.head = None
self.inserted_items = []
def build_new_head(self, value_node):
new_rightmost_node = self.head
self.head = Node(self, [value_node])
value_node.node.parent = self.head
self.head.rightmost_node = new_rightmost_node
new_rightmost_node.parent = self.head
assert self.head.rightmost_node is not None
return value_node
def insert(self, value):
self.head.insert(value)
self.inserted_items.append(value)
self.head.check_integrity()
if not IntegrityChecker(self).check_integrity():
import ipdb; ipdb.set_trace()
promote_value_node = build_new_head
def __repr__(self):
return "BTree({0})".format(repr(self.head))
class ValueNode(object):
def __init__(self, value, node=None):
self.value = value
self.node = node
def __lt__(self, other):
return self.value < other.value
def __gt__(self, other):
return self.value > other.value
def __repr__(self):
return "ValueNode({0}, {1})".format(repr(self.node), repr(self.value))
class Node(object):
max_num_values = 3
def __init__(self, parent, value_nodes=None, rightmost_node=None):
self.parent = parent
self.value_nodes = value_nodes or []
self.rightmost_node = rightmost_node
self.claim_child_nodes()
def check_integrity(self):
if self.is_leaf_node: return True
if self.rightmost_node:
return all(child_node.check_integrity() for child_node in self.nodes if child_node is not None)
import ipdb; ipdb.set_trace()
return False
def claim_child_nodes(self):
for node in self.nodes:
if node:
node.parent = self
@property
def is_leaf_node(self):
return not any(self.nodes)
@property
def number_of_value_nodes(self):
return len(self.value_nodes)
@property
def nodes(self):
return [value_node.node for value_node in self.value_nodes] + [self.rightmost_node]
@property
def values(self):
return [value_node.value for value_node in self.value_nodes]
def __getitem__(self, item):
return self.nodes[item]
def promote_value_node(self, value_node):
bisect.insort(self.value_nodes, value_node)
if value_node.node:
value_node.node.parent = self
self.maybe_rebalance()
def maybe_rebalance(self):
if self.number_of_value_nodes < self.max_num_values:
return
value_node_to_promote = self.value_nodes[self.number_of_value_nodes/2]
promoted_nodes_old_node = value_node_to_promote.node
value_node_to_promote.node = Node(
self.parent,
value_nodes=self.value_nodes[:self.number_of_value_nodes/2],
rightmost_node=promoted_nodes_old_node
)
self.value_nodes = self.value_nodes[self.number_of_value_nodes/2+1:]
self.parent.promote_value_node(value_node_to_promote)
self.check_integrity()
def insert(self, value):
if self.is_leaf_node:
value_node = ValueNode(value)
bisect.insort(self.value_nodes, value_node)
self.maybe_rebalance()
return value_node
return self.pick_node(value).insert(value)
def pick_node(self, value):
if self.rightmost_node is None:
import ipdb; ipdb.set_trace()
for value_node in self.value_nodes:
if value < value_node.value:
return value_node.node
return self.rightmost_node
def __repr__(self):
return "Node({0}, {1})".format(", ".join(map(repr, self.value_nodes)), self.rightmost_node)

View File

@ -1,23 +0,0 @@
def left_partials(incoming):
product = 1
for i in incoming:
product *= i
yield product
def but_one(incoming):
"""Given an array `incoming` return an array whose ith index is the
sum of all the elements of `incoming` except for `incoming[i]`
"""
lpartials = list(left_partials(incoming))
rproduct = 1
result = [None] * len(incoming)
for i in range(len(incoming)):
back_index = len(incoming) - i - 1
if back_index > 0:
result[back_index] = rproduct * lpartials[back_index-1]
if back_index < len(incoming):
rproduct *= incoming[back_index]
else:
result[back_index] = rproduct
return result

View File

@ -1,42 +0,0 @@
import inspect
class cached_property(object):
"""Descriptor that caches the result of the first call to resolve its
contents.
"""
def __init__(self, func):
self.__doc__ = getattr(func, '__doc__')
self.func = func
def __get__(self, obj, cls):
if obj is None:
return self
value = self.func(obj)
setattr(obj, self.func.__name__, value)
return value
def bust_self(self, obj):
"""Remove the value that is being stored on `obj` for this
:class:`.cached_property`
object.
:param obj: The instance on which to bust the cache.
"""
if self.func.__name__ in obj.__dict__:
delattr(obj, self.func.__name__)
@classmethod
def bust_caches(cls, obj, excludes=()):
"""Bust the cache for all :class:`.cached_property` objects on `obj`
:param obj: The instance on which to bust the caches.
"""
for name, _ in cls.get_cached_properties(obj):
if name in obj.__dict__ and name not in excludes:
delattr(obj, name)
@classmethod
def get_cached_properties(cls, obj):
return inspect.getmembers(type(obj), lambda x: isinstance(x, cls))

View File

@ -1,25 +0,0 @@
import operator
def generate_decreasing_n_sequence_with_bounded_sum(
sequence_length, sum_bound, value_bound=float('inf'),
):
if sequence_length == 0:
yield []
return
min_remaining = sequence_length*(sequence_length - 1)/2
bound_for_current = min(sum_bound - min_remaining, value_bound)
for value in range(sequence_length, bound_for_current):
for sequence in generate_decreasing_n_sequence_with_bounded_sum(
sequence_length - 1, sum_bound - value, value_bound=value,
):
yield [value] + sequence
def build_products_to_sequences_map():
product_to_sequences_map = {}
for sequence in generate_decreasing_n_sequence_with_bounded_sum(4, 18):
product = reduce(operator.mul, sequence, 1)
product_to_sequences_map.setdefault(product, []).append(sequence)
return product_to_sequences_map

View File

@ -1,42 +0,0 @@
class CountCrimes(object):
def __init__(self, profits, groups, profit_needed, group_count):
self.profits = profits
self.groups = groups
self.crime_count = len(profits)
self.profit_needed = profit_needed
self.group_count = count
self.reset_cache()
def process_crime(self, profit, num_required):
for gangster_count in range(self.group_count, -1, -1):
for profit_amount in range(self.profit_needed, -1, -1):
new_gangster_count = gangster_count + num_required
new_profit = profit_amount + profit
if new_profit > self.profit_needed:
new_profit = self.profit_needed
new_count = self.cache[gangster_count][profit_amount]
if new_count > 0 and new_gangster_count <= self.group_count:
self.cache[new_gangster_count][new_profit] += new_count
def reset_cache(self):
self.cache = [[0 for _ in range(profit_needed + 1)]
for _ in range(group_count + 1)]
self.cache[0][0] = 1
def process_crimes(self):
for profit, num_required in zip(self.profits, self.groups):
self.process_crime(profit, num_required)
def get_count(self):
self.reset_cache()
self.process_crimes()
return self.count_ways()
def count_ways(self):
return sum(self.cache[i][self.profit_needed]
for i in range(self.group_count + 1))
if __name__ == '__main__':
print(CountCrimes().get_count())

View File

@ -1,58 +0,0 @@
#! /usr/bin/env python
import itertools
def textJustify(input_string, justification_length):
partitioning = partition_paragraph(input_string, justification_length)
return "\n".join(itertools.chain(
(justify_line(line_partition, justification_length)
for line_partition in partitioning[:-1]),
[" ".join(partitioning[-1])]
))
def justify_line(line_words, justification_length):
if len(line_words) == 1:
return line_words[0]
total_length = sum(len(word) for word in line_words)
word_count = len(line_words)
number_of_word_boundaries = word_count - 1
spaces_to_add = justification_length - total_length
base_spaces = spaces_to_add // number_of_word_boundaries
extra_spaces = spaces_to_add % number_of_word_boundaries
output_string = ""
for i, word in enumerate(line_words):
output_string += word
if i >= len(line_words) - 1:
break
space_count = base_spaces
if i < extra_spaces:
space_count += 1
spaces = " " * space_count
output_string += spaces
return output_string
def partition_paragraph(input_string, justification_length):
current_line_lenth = 0
partitioning = []
current = []
for word in input_string.split():
word_length = len(word)
length_with_word = current_line_lenth + word_length
if justification_length < length_with_word:
partitioning.append(current)
current = []
current_line_lenth = 0
length_with_word = word_length
current.append(word)
current_line_lenth = length_with_word + 1
if current:
partitioning.append(current)
return partitioning
if __name__ == '__main__':
sample = "Coursera provides universal access to the world's best education, partnering with to universities and organizations to offer courses online."
print(textJustify(sample, 10))

View File

@ -1,107 +0,0 @@
# neighbors = {
# 1: [6, 8],
# 2: [7, 9],
# 3: [8, 4],
# 4: [9, 3, 0],
# 5: [],
# 6: [0, 7, 1],
# 7: [2, 6],
# 8: [1, 3],
# 9: [4, 2],
# 0: [4, 6]
# }
# cache = {}
# def count_numbers(current_number, number_of_hops):
# cache_value = (current_number, number_of_hops)
# if cache_value in cache:
# return cache[cache_value]
# if number_of_hops == 1:
# return 1
# number_count = 0
# for neighbor in neighbors[current_number]:
# number_count += count_numbers(neighbor, number_of_hops - 1)
# cache[cache_value] = number_count
# return number_count
class DialpadCounter(object):
knight_deltas = [
(2, 1),
(-2, -1),
(-2, 1),
(2, -1),
(1, 2),
(-1, -2),
(-1, 2),
(1, -2)
]
def __init__(self, dialpad_matrix):
self._matrix = dialpad_matrix
self._row_size = len(dialpad_matrix[0])
self._row_count = len(dialpad_matrix)
self._cache = {}
def neighbors(self, y, x):
result = []
for delta_y, delta_x in self.knight_deltas:
neighbor_y = delta_y + y
neighbor_x = delta_x + x
neighbor = (neighbor_y, neighbor_x)
if (self.inbounds(neighbor_y, neighbor_x) and
self._matrix[neighbor_y][neighbor_x]):
result.append(neighbor)
return result
def inbounds(self, y, x):
return 0 <= x < self._row_size and 0 <= y < self._row_count
def count_numbers(self, coordinate, number_of_hops):
y, x = coordinate
if not self._matrix[y][x]:
raise Exception()
cache_value = (coordinate, number_of_hops)
if cache_value in self._cache:
return self._cache[cache_value]
if number_of_hops == 1:
return 1
number_count = 0
for neighbor in self.neighbors(y, x):
number_count += self.count_numbers(neighbor, number_of_hops - 1)
self._cache[cache_value] = number_count
return number_count
def count_numbers(number, number_of_hops):
matrix = [
[True, True, True],
[True, True, True],
[True, True, True],
[False, True, False]
]
if number == 0:
coordinate = 3, 1
else:
row = (number - 1) // 3
column = (number - 1) % 3
coordinate = (row, column)
counter = DialpadCounter(matrix)
return counter.count_numbers(coordinate, number_of_hops)
if __name__ == '__main__':
print(count_numbers(1, 1))
print(count_numbers(1, 2))
print(count_numbers(1, 3))
print(count_numbers(1, 4))
print(count_numbers(1, 10))
print(count_numbers(1, 30))

View File

@ -1,10 +0,0 @@
#!/usr/bin/env python
import sys
def escape(string):
print repr(string)
if __name__ == '__main__':
escape(sys.stdin.read())

View File

@ -1,56 +0,0 @@
class MazeSolver(object):
def __init__(self, maze):
self.maze = maze
self.row_length = len(maze[0])
self.column_length = len(maze)
self.visited = set()
@property
def finish(self):
return (self.column_length - 1, self.row_length - 1)
deltas = [(1, 0), (0, 1), (-1, 0), (0, -1)]
def is_in_bounds(self, location):
column_index, row_index = location
return (
0 <= column_index < self.column_length and
0 <= row_index < self.row_length
)
def find_adjacency(self, location):
for delta in self.deltas:
column_delta, row_delta = delta
column_location, row_location = location
new_column_location = column_location + column_delta
new_row_location = row_location + row_delta
adjacent_location = (new_column_location, new_row_location)
if (
self.is_in_bounds(adjacent_location) and
self.maze[new_column_location][new_row_location]
):
yield adjacent_location
def solve(self, current_location=(0, 0)):
if current_location == self.finish:
return [current_location]
self.visited.add(current_location)
for new_location in self.find_adjacency(current_location):
if new_location in self.visited:
continue
result = self.solve(new_location)
if result is not None:
return [current_location] + result
return None
if __name__ == '__main__':
maze = [
[1, 1, 1, 1, 1],
[0, 0, 1, 0, 0],
[1, 0, 1, 1, 1],
[1, 0, 0, 0, 1],
[1, 1, 1, 1, 1]
]
print(MazeSolver(maze).solve())

View File

@ -1,85 +0,0 @@
#! /usr/bin/env python
class GameOfLife(object):
neighbor_deltas = [
(1, 0), (1, 1), (1, -1),
(0, 1), (0, -1),
(-1, 0), (-1, 1), (-1, -1)
]
@classmethod
def empty_with_size(cls, rows, columns=None):
columns = columns or rows
return cls([])
@staticmethod
def build_empty_grid(rows, columns):
return [[False for _ in range(columns)] for _ in range(rows)]
def __init__(self, initial_state):
self.current_state = initial_state
self.row_count = len(initial_state)
self.column_count = len(initial_state[0])
def _neighbors(self, row, column):
for (row_delta, column_delta) in self.neighbor_deltas:
candidate_row = row + row_delta
candidate_column = column + column_delta
if self._in_bounds(candidate_row, candidate_column):
yield candidate_row, candidate_column
def _in_bounds(self, row, column):
return 0 <= row < self.row_count and 0 <= column < self.column_count
def _next_state_for_cell(self, row, column):
live_count = 0
cell_was_live = self.current_state[row][column]
for neighbor_row, neighbor_column in self._neighbors(row, column):
if self.current_state[neighbor_row][neighbor_column]:
live_count += 1
if cell_was_live:
return 1 < live_count < 4
else:
return live_count == 3
def compute_next_game_state(self, new_state=None):
new_state = new_state or self.build_empty_grid(
self.row_count, self.column_count,
)
for row in range(self.row_count):
for column in range(self.column_count):
new_state[row][column] = self._next_state_for_cell(row, column)
return new_state
def tick(self, new_state=None):
self.current_state = self.compute_next_game_state(new_state)
def _build_row_string(self, row):
return " ".join(["o" if state else "." for state in row])
@property
def state_string(self):
return "\n".join(
self._build_row_string(row) for row in self.current_state
)
@classmethod
def run(cls, initial_state, generations=30):
game = cls(initial_state)
for _ in range(generations):
game.tick()
print(game.state_string)
return game.current_state
sample_size = 50
sample_state = [
[False, True, False] + ([False] * (sample_size - 3)),
[False, False, True] + ([False] * (sample_size - 3)),
[True, True, True] + ([False] * (sample_size - 3)),
] + [[False] * sample_size for _ in range(sample_size - 3)]
if __name__ == '__main__':
GameOfLife.run(sample_state)

View File

@ -1,202 +0,0 @@
#!/usr/bin/env python
import optparse
import os
import re
import subprocess
def segment(iterable, segment_length):
if segment_length is None:
yield iterable
raise StopIteration
def yield_length():
for _ in xrange(segment_length):
yield iterable.next()
while True:
segment = list(yield_length())
if not segment:
raise StopIteration
yield segment
def build_file_extension_re(file_extensions):
return '.*\.(?:' + '|'.join(file_extensions) + ')'
class BlameCounter(object):
DIVIDER = '------------------------------'
committer_matcher = re.compile('\((.*?)\s*[0-9]{4}')
def __init__(
self,
search_expressions=(),
ignore_expressions=(),
filename_re='.*\.(?:py|tmpl)',
chunk_size=None,
):
self.path_matchers = [
re.compile(search_expression)
for search_expression in search_expressions
]
self.ignore_matchers = [
re.compile(ignore_expression)
for ignore_expression in ignore_expressions
]
self.filename_matcher = re.compile(filename_re)
self.chunk_size = chunk_size
self.blame_line_count_map = {}
def match_path_and_filename(self, path, filename):
filepath = os.path.join(path, filename)
return all(
bool(path_matcher.search(filepath)) for path_matcher in self.path_matchers
) and bool(self.filename_matcher.search(filename))
def get_matching_files(self):
for directory_path, directory_names, filenames in os.walk('.'):
for directory_name in directory_names:
if any(
ignore_matcher.search(directory_name)
for ignore_matcher in self.ignore_matchers
):
del directory_names[directory_names.index(directory_name)]
for filename in filenames:
if self.match_path_and_filename(directory_path, filename):
yield os.path.join(directory_path, filename)
def git_blame_files(self, filenames):
for filename in filenames:
if subprocess.call(
['git ls-files %s --error-unmatch' % filename],
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
):
continue
yield (filename, subprocess.Popen(
['git', 'blame', filename],
stdout=subprocess.PIPE
).communicate()[0])
def count_blame_lines(self):
for blame_output_chunk in segment(
self.git_blame_files(self.get_matching_files()),
self.chunk_size
):
self._count_blame_lines(blame_output_chunk)
if self.chunk_size:
self.print_results(
max_committers=50,
min_blame_lines=None
)
def _count_blame_lines(self, blame_outputs):
for _, blame_output in blame_outputs:
for line in blame_output.split('\n'):
match = self.committer_matcher.search(line)
if match:
committer = match.group(1)
self.blame_line_count_map[committer] = \
self.blame_line_count_map.setdefault(committer, 0) + 1
def get_blame_lines_in_files_by_comitters(self):
blame_count_in_files_by_committer = {}
for filename, blame_output in self.git_blame_files(self.get_matching_files()):
for line in blame_output.split('\n'):
match = self.committer_matcher.search(line)
if match:
committer = match.group(1)
committer_blame_lines = blame_count_in_files_by_committer.setdefault(
committer, {},
)
committer_blame_lines[filename] = committer_blame_lines.setdefault(
filename, 0,
) + 1
return blame_count_in_files_by_committer
def print_results(self, max_committers=None, min_blame_lines=None):
print self.DIVIDER
for (rank, (committer, blame_lines)) in enumerate(
sorted(
self.blame_line_count_map.iteritems(),
key=lambda x: x[1],
reverse=True
)
):
if rank is not None and rank == max_committers:
return
if min_blame_lines is None or blame_lines > min_blame_lines:
print str(rank + 1), committer, ': ', blame_lines
if __name__ == '__main__':
parser = optparse.OptionParser()
parser.add_option(
'--search-re',
action='append',
dest='search_expressions',
help='A regular expression to use when inspecting filepaths'
)
parser.add_option(
'--ignore-re',
action='append',
default=[],
dest='ignore_expressions',
help='Ignore directories matching this re.'
)
parser.add_option(
'-x',
action='append',
dest='file_extensions',
help=('Search for filenames with the given file extension. '
'Can be used multiple times.')
)
parser.add_option(
'--chunk-size',
dest='chunk_size',
type=int,
help='Print the rankings at intervals of CHUNK_SIZE files.'
)
parser.add_option(
'--committer-lines',
dest='committer_lines',
action='store_true',
default=False,
help=('Count blame lines for committer by file.')
)
(namespace, _) = parser.parse_args()
blame_counter_build_kwargs = {
'chunk_size': namespace.chunk_size,
'search_expressions': namespace.search_expressions,
'ignore_expressions': namespace.ignore_expressions
}
if namespace.file_extensions:
blame_counter_build_kwargs['filename_re'] = build_file_extension_re(
namespace.file_extensions
)
blame_counter = BlameCounter(**blame_counter_build_kwargs)
if namespace.committer_lines:
import operator
def sum_of_comitter_lines(committer_tuple):
_, blame_lines_by_file = committer_tuple
return sum(blame_count for filename, blame_count in blame_lines_by_file.iteritems())
blame_lines_in_files_by_committers = blame_counter.get_blame_lines_in_files_by_comitters()
blame_lines_in_files_by_comitters_sorted_by_total_count = sorted(
blame_lines_in_files_by_committers.iteritems(),
key=sum_of_comitter_lines,
reverse=True
)
sorted_blame_lines_in_files_by_comitters = [
(comitter, sorted(blame_lines_by_file.iteritems(), key=operator.itemgetter(1), reverse=True))
for comitter, blame_lines_by_file in blame_lines_in_files_by_comitters_sorted_by_total_count
]
import ipdb; ipdb.set_trace()
else:
blame_counter.count_blame_lines()
blame_counter.print_results()

View File

@ -1,46 +0,0 @@
#!/usr/bin/env python
import argparse
from iterpipes import *
class GitDiffReplacer(object):
def __init__(self, string_to_replace, replacing_string,
source_ref='HEAD~1', destination_ref='HEAD',
verbose=False):
self.source_ref = source_ref
self.destination_ref = destination_ref
self.string_to_replace = string_to_replace
self.replacing_string = replacing_string
self.verbose = verbose
@property
def modified_files_command(self):
return linecmd('git diff {} {} --name-only', self.source_ref,
self.destination_ref)
def git_diff_command(self, filename):
return cmd('git diff {}:{} {}:{}', self.source_ref, filename.strip(),
self.destination_ref, self.perform_substitutions(filename).strip())
def perform_substitutions(self, filename):
return filename.replace(self.string_to_replace, self.replacing_string)
def filter_filenames(self, filenames):
for filename in filenames:
if not self.replacing_string in filename:
yield filename
def run(self):
return '\n'.join([
list(run(self.git_diff_command(filename)))[0]
for filename in self.filter_filenames(
run(self.modified_files_command)
)
])
if __name__ == '__main__':
import sys
print GitDiffReplacer(sys.argv[1], sys.argv[2]).run()

View File

@ -1,27 +0,0 @@
import random
class HatsProblem(object):
def __init__(self, size):
self.size = size
def build_hats(self):
return [self.hat() for _ in range(self.size)]
def hat(self):
return random.randint(0, self.size - 1)
def go(self):
hats = self.build_hats()
guesses = [
self.calculate_guess_modulus(self.sum_of_all_but_i(i, hats), i)
for i in range(self.size)
]
return zip(hats, guesses)
def calculate_guess_modulus(self, current, desired):
return ((desired - current) + self.size) % self.size
def sum_of_all_but_i(self, i, hats):
return sum(hat for index, hat in enumerate(hats) if index != i)

View File

@ -1,37 +0,0 @@
class Solution(object):
def largestOverlap(self, A, B):
self.init(A, B)
return max(
self.compare(x_trans, y_trans)
for x_trans in range(-(self.row_length-1), self.row_length)
for y_trans in range(-(self.column_count-1), self.column_count)
)
def init(self, A, B):
self.A = A
self.B = B
self.row_length = len(A[0])
self.column_count = len(A)
def compare(self, x_trans, y_trans):
overlap_count = 0
for row_selection in range(
max(y_trans, 0),
min(self.column_count, self.column_count + y_trans)
):
for column_selection in range(
max(x_trans, 0),
min(self.row_length, self.row_length + x_trans)
):
if (
self.A[row_selection][column_selection] ==
self.B[row_selection - y_trans][column_selection - x_trans] == 1
):
overlap_count += 1
return overlap_count
if __name__ == '__main__':
sol = Solution()
sol.init([[1,1,0],[0,1,0],[0,1,0]],
[[1,0,0],[0,1,1],[0,0,1]])
print(sol.compare(-1, -1))

View File

@ -1,64 +0,0 @@
import os
import errno
from invoke import run, Collection, task as ctask
def link_filenames(ctx, link_pairs, force=False):
for source, destination in link_pairs:
destination = os.path.expanduser(destination)
source = os.path.expanduser(source)
if force:
ctx.run("sudo rm -rf {0}".format(destination))
if os.path.exists(destination):
print("Skipping {0} because path already exists".format(destination))
else:
print("Linking {0} to {1}".format(destination, source))
ctx.run('ln -s {0} {1}'.format(source, destination))
def ensure_path_exists(path):
try:
os.makedirs(path)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
def command_exists(command, run=run):
return run("hash {0}".format(command), warn=True, hide=True).exited == 0
def build_task_factory(ns):
def task(function, *args, **kwargs):
ns.add_task(ctask(function, *args, **kwargs))
return function
return task
def namespace_and_factory(*args, **kwargs):
ns = Collection(*args, **kwargs)
return ns, build_task_factory(ns)
def extension_checker(extension):
extension_suffix = ".{}".format(extension)
def ends_with(string):
return string.endswith(extension_suffix)
return ends_with
def tasks_from_directory(directory_path, file_predicate=extension_checker("sh")):
ns, make_task = namespace_and_factory(os.path.basename(directory_path))
def task_from_file(filepath):
def run_script(ctx):
ctx.run(filepath)
return make_task(run_script, name=os.path.basename(filepath).split(os.path.extsep)[0])
filepaths = filter(
os.path.isfile,
[os.path.join(directory_path, filename)
for filename in os.listdir(directory_path)],
)
list(map(task_from_file, filepaths))
return ns

View File

@ -1,168 +0,0 @@
#!/usr/bin/env python
import heapq
from collections import namedtuple
BuyOpportunity = namedtuple(
'BuyOpportunity',
['trans_yield', 'start', 'end']
)
def maximize_profit(num_transactions, prices):
minima, maxima = find_extrema(prices)
opp_queue, reverse_opp_queue = make_opportunity_queues(
minima, maxima, prices,
)
if not opp_queue:
return []
largest_segment = opp_queue[0][1]
# Segments will be kept in sorted order
segments = [largest_segment]
# Remove any reverse yields that are greater than the largest actual yield
# since they can never be realized anyway.
while (reverse_opp_queue and reverse_opp_queue[0][1].trans_yield >=
largest_segment.trans_yield):
heapq.heappop(reverse_opp_queue)
def try_rev_opp():
# It is okay to definitely pop here even though we don't know that we
# can actually use the opp for the following reason:
# Since the rev opp queue was selected OVER that of the opp queue, we
# KNOW that the bounding segment that includes this rev opp must have
# already been selected if it is going to be included at all (since it
# must have greater yield).
_, rev_opp_can = heapq.heappop(reverse_opp_queue)
for (seg_index, split_seg) in enumerate(segments):
if split_seg.end >= rev_opp_can.end:
# Since segments is sorted, this must be the correct segment
break
else:
return
if split_seg.start <= rev_opp_can.start:
# We found the containing segment
left_yield = prices[rev_opp_can.start] - prices[split_seg.start]
right_yield = prices[split_seg.end] - prices[rev_opp_can.end]
left_segment = BuyOpportunity(left_yield, split_seg.start, rev_opp_can.start)
right_segment = BuyOpportunity(right_yield, rev_opp_can.end, split_seg.end)
segments.pop(seg_index)
segments.insert(seg_index, left_segment)
segments.insert(seg_index + 1, right_segment)
def try_opp():
_, opp = heapq.heappop(opp_queue)
if not segments:
segments.append(opp)
insertion_index = 0
for (index, seg) in enumerate(segments):
if seg.start >= opp.start:
insertion_index = index
break
else:
insertion_index = len(segments)
seg = None
previous_seg = segments[insertion_index - 1] if insertion_index > 0 else None
if ((seg is None or seg.start >= opp.end) and
(previous_seg is None or previous_seg.end <= opp.start)):
# There is no overlap, so we can insert
segments.insert(insertion_index, opp)
else:
pass
while (opp_queue or reverse_opp_queue) and len(segments) < num_transactions:
if not reverse_opp_queue:
try_opp()
elif not opp_queue:
try_rev_opp()
else:
opp_can = opp_queue[0][1]
rev_opp_can = reverse_opp_queue[0][1]
if rev_opp_can.trans_yield > opp_can.trans_yield:
try_rev_opp()
else:
try_opp()
return segments
def make_opportunity_queues(minima, maxima, prices):
opp_queue = []
reverse_opp_queue = []
for min_index, minimum in enumerate(minima):
for max_index, maximum in enumerate(maxima):
transaction_yield = prices[maximum] - prices[minimum]
if transaction_yield < 0:
# We can ignore this pair because the transaction has negative
# yield.
continue
# minimum comes before maximum in time
if minimum < maximum:
# Transaction yield is made negative because heapq is a min-heap
heapq.heappush(
opp_queue, ((-transaction_yield, maximum - minimum), BuyOpportunity(
transaction_yield, minimum, maximum,
)),
)
else:
heapq.heappush(
reverse_opp_queue, (-transaction_yield, BuyOpportunity(
transaction_yield, maximum, minimum,
))
)
return opp_queue, reverse_opp_queue
def find_extrema(prices):
maxima = []
minima = []
length_of_prices = len(prices)
if length_of_prices < 2:
return minima, maxima
upwards = None
last = prices[0]
for (index, price) in enumerate(prices):
if price < last:
if upwards is True:
maxima.append(index - 1)
elif upwards is None:
# We set the starting price as a maximum, but theres no point
# since we would really never buy.
maxima.append(0)
pass
upwards = False
elif price > last:
if upwards is False:
minima.append(index - 1)
elif upwards is None:
# The starting value is a minimum
minima.append(0)
upwards = True
last = price
if upwards is True:
maxima.append(length_of_prices - 1)
elif upwards is False:
minima.append(length_of_prices - 1)
return minima, maxima
if __name__ == '__main__':
print (maximize_profit(10, [0, 1, 3, 2, 3, 0, 10, 12, 1, 2, 3, 2, 0, 2, 4, 3, 6, 4, 14, 1, 0, 2, 4, 5, 4, 5, 6]))
print [
BuyOpportunity(trans_yield=1, start=0, end=1),
BuyOpportunity(trans_yield=12, start=2, end=4),
BuyOpportunity(trans_yield=2, start=5, end=7),
BuyOpportunity(trans_yield=6, start=9, end=11),
BuyOpportunity(trans_yield=6, start=12, end=13),
BuyOpportunity(trans_yield=10, start=14, end=15),
BuyOpportunity(trans_yield=5, start=17, end=20),
BuyOpportunity(trans_yield=2, start=21, end=23),
]

View File

@ -1,58 +0,0 @@
#! /usr/bin/env python
import itertools
def textJustify(input_string, justification_length):
partitioning = partition_paragraph(input_string, justification_length)
return "\n".join(itertools.chain(
(justify_line(line_partition, justification_length)
for line_partition in partitioning[:-1]),
[" ".join(partitioning[-1])]
))
def justify_line(line_words, justification_length):
if len(line_words) == 1:
return line_words[0]
total_length = sum(len(word) for word in line_words)
word_count = len(line_words)
number_of_word_boundaries = word_count - 1
spaces_to_add = justification_length - total_length
base_spaces = spaces_to_add // number_of_word_boundaries
extra_spaces = spaces_to_add % number_of_word_boundaries
output_string = ""
for i, word in enumerate(line_words):
output_string += word
if i >= len(line_words) - 1:
break
space_count = base_spaces
if i < extra_spaces:
space_count += 1
spaces = " " * space_count
output_string += spaces
return output_string
def partition_paragraph(input_string, justification_length):
min_line_length = 0
partitioning = []
current = []
for word in input_string.split():
word_length = len(word)
length_with_word = min_line_length + word_length
if justification_length < length_with_word:
partitioning.append(current)
current = []
min_line_length = 0
length_with_word = word_length
current.append(word)
min_line_length = length_with_word + 1
if current:
partitioning.append(current)
return partitioning
if __name__ == '__main__':
sample = "Coursera provides universal access to the world's best education, partnering with to universities and organizations to offer courses online."
print(textJustify(sample, 10))

View File

@ -1,68 +0,0 @@
#!/usr/bin/env python
import sys
class KnightMoves(object):
deltas = [(1, 2), (2, 1), (-1, 2), (-2, 1),
(1, -2), (2, -1), (-1, -2), (-2, -1)]
max_x = 8
max_y = 8
def count_knight_moves(self, start, end):
this_generation = [start]
move_count = 0
seen = set()
while True:
for position in this_generation:
if position in seen:
continue
elif position == end:
return move_count
else:
seen.add(position)
this_generation = list(self.generate_moves_from_generation(this_generation))
move_count += 1
def generate_moves_from_generation(self, previous_generation):
return (
position
for ancestor in previous_generation
for position in self.generate_moves_from_position(ancestor)
)
def generate_moves_from_position(self, position):
x, y = position
return (
(x + delta_x, y + delta_y)
for delta_x, delta_y in self.deltas
if self.in_bounds(x + delta_x, y + delta_y)
)
def in_bounds(self, x, y):
return 0 <= x < self.max_x and 0 <= y < self.max_y
def file_to_index(file_char):
assert 'a' <= file_char <= 'h'
return ord(file_char) - 97
def rank_to_index(rank):
assert 0 < int(rank) <= 8
return int(rank) - 1
def square_name_to_indices(square_name):
file_char, rank_char = square_name
return rank_to_index(int(rank_char)), file_to_index(file_char)
if __name__ == '__main__':
print KnightMoves().count_knight_moves(
square_name_to_indices(sys.argv[1]),
square_name_to_indices(sys.argv[2])
)

View File

@ -1,11 +0,0 @@
import logging
from coloredlogs import ColoredStreamHandler
def enable_logger(log_name, level=logging.DEBUG):
log = logging.getLogger(log_name)
handler = ColoredStreamHandler(severity_to_style={'WARNING': dict(color='red')})
handler.setLevel(level)
log.setLevel(level)
log.addHandler(handler)

View File

@ -1,90 +0,0 @@
#!/usr/bin/env python
import sys
from cached_property import cached_property
class PalindromeSubstringFinder(object):
def __init__(self, input_string):
self.input_string = input_string
self.input_length = len(input_string)
self.position_count = (len(input_string) * 2) - 1
self._palindrome_lengths = [0] * self.position_count
self._total_comparisons = 0
self._max_palindrome_index = 0
@property
def longest_palindrome(self):
index = self._max_palindrome_index // 2
half_length = self.longest_palindrome_length // 2
offset = self.longest_palindrome_length % 2
left_index = index - half_length + 1 - offset
right_index = index + half_length + 1
return self.input_string[left_index:right_index]
@property
def longest_palindrome_length(self):
return self._palindrome_lengths[self._max_palindrome_index]
def print_current_state(self, position):
print ("".join(map(str, self._palindrome_lengths)))
print (" ".join(self.input_string))
print ("{0}^".format(" " * position))
def palindrome_lengths(self):
if self.input_string is None or self.input_length < 1:
return []
max_reach = -1
for position in range(self.position_count):
starting_offset = self._palindrome_lengths[position] // 2
index = position // 2
right_offset = position % 2
palindrome_length, max_index = self.get_length_from_indices(
index - starting_offset,
index + starting_offset + right_offset,
)
self._palindrome_lengths[position] = palindrome_length
if palindrome_length > self._palindrome_lengths[self._max_palindrome_index]:
self._max_palindrome_index = position
if max_reach < max_index:
max_reach = max_index
self.copy_palindrome_lengths(position, palindrome_length)
self.print_current_state(position)
return self._palindrome_lengths
def copy_palindrome_lengths(self, position, palindrome_length):
# b a a b a a b
# 1004007004001
for offset in range(1, palindrome_length - 1):
self._palindrome_lengths[position + offset] = (
min(
self._palindrome_lengths[position - offset],
palindrome_length - offset
)
)
def get_length_from_indices(self, left_index, right_index):
while (left_index >= 0 and right_index < self.input_length and
self.input_string[left_index] == self.input_string[right_index]):
self._total_comparisons += 1
left_index -= 1
right_index += 1
self._total_comparisons += 1
# We'll always go one set of indices PAST the point that we should have,
# so right_index - left_index + 1 becomes the below:
return right_index - left_index - 1, right_index - 1
if __name__ == '__main__':
target = sys.argv[1]
finder = PalindromeSubstringFinder(target)
finder.palindrome_lengths()
print(finder._total_comparisons)
print(finder._max_palindrome_index)
print(finder.longest_palindrome)

View File

@ -1,84 +0,0 @@
class Node(object):
def __init__(self, key, value, next_node=None, prev_node=None):
self.key = key
self.value = value
self.next_node = next_node
self.prev_node = prev_node
def print_list(self):
print("{0} - {1}".format(self.key, self.value))
if self.next_node is not None:
assert self == self.next_node.prev_node
self.next_node.print_list()
else:
print("next node is None")
if self.prev_node is not None:
assert self.prev_node.next_node == self
class LRUCache(object):
def __init__(self, capacity):
self.capacity = capacity
self.cache = {}
self.head = None
self.tail = None
def put(self, key, value):
"""
If key already exists, replace the current value with the new value.
If the key doesn't exist, add the new key/value entry to the cache.
If the addition of the new entry causes the number of entries to exceed
num_entries, remove the oldest entry based on the last time the entry is
accessed (either through put or get).
"""
if key in self.cache:
node = self.cache[key]
node.value = value
self.move_to_tail(node)
return
if len(self.cache) >= self.capacity:
old_head = self.remove_from_head()
del self.cache[old_head.key]
new_node = Node(key, value)
self.set_new_tail(new_node)
self.cache[key] = new_node
def set_new_tail(self, node):
node.prev_node = self.tail
if self.tail is not None:
self.tail.next_node = node
self.tail = node
if self.head is None:
self.head = node
def move_to_tail(self, node):
if node is self.tail:
return
if node.prev_node is None: # This is the head
if node.next_node is not None:
self.head = node.next_node
node.next_node.prev_node = None
else:
node.prev_node.next_node = node.next_node
node.next_node.prev_node = node.prev_node
node.prev_node = self.tail
self.tail.next_node = node
self.tail = node
self.tail.next_node = None
def remove_from_head(self):
previous_head = self.head
self.head = self.head.next_node
self.head.prev_node = None
return previous_head
def get(self, key):
"""Return the value associated with the key, or None if the key doesn't
exist."""
node = self.cache.get(key)
if node is not None:
self.move_to_tail(node)
return node.value

View File

@ -1,28 +0,0 @@
import re
class LeafMatcher(object):
def __init__(self, regexp):
self.matcher = re.compile(regexp)
def matches(self, string):
return bool(self.matcher.search(string))
class AndMatcher(object):
def __init__(self, *matchers):
self.matchers = matchers
def matches(self, string):
return all(matcher.matches(string) for matcher in self.matchers)
class OrMatcher(object):
def __init__(self, *matchers):
self.matchers = matchers
def matches(self, string):
return any(matcher.matches(string) for matcher in self.matchers)

View File

@ -1,51 +0,0 @@
class Result(object):
def __init__(self, lb, excluded, rb, max):
self.lb = lb
self.excluded = excluded
self.rb = rb
self.max = max
def __repr__(self):
return "left bound: {0}, excluded {1}, right_bound {2}, max {3}".format(
self.lb,
self.excluded,
self.rb,
self.max,
)
def max_double_slice(array):
left_slices = max_slice_at_index(array)
right_slices = max_slice_at_index(array[::-1])[::-1]
print left_slices
print right_slices
def slice_sum(index):
left_contribution = left_slices[index - 1][-1] if index > 0 else 0
right_contribution = right_slices[index + 1][-1] if index < len(array) - 2 else 0
return right_contribution + left_contribution
maximizing_slice_index = max((i for i in range(len(array))), key=slice_sum)
left_bound, lmax = left_slices[maximizing_slice_index - 1] if maximizing_slice_index > 0 else (0, 0)
rs, rmax = right_slices[maximizing_slice_index + 1] if maximizing_slice_index < len(array) - 2 else (len(array) - 1, 0)
right_bound = len(array) - 1 - rs
return Result(left_bound, maximizing_slice_index, right_bound, lmax + rmax)
def max_double_slice_value(array):
left, right = max_double_slice(array)
return left[-1] + right[-1]
def max_slice_at_index(array):
max_at_index = []
for index, array_value in enumerate(array):
last_start, last_max = max_at_index[-1] if max_at_index else (index, 0)
if last_max < 0:
current_max = array_value
current_start = index
else:
current_max = array_value + last_max
current_start = last_start
max_at_index.append((current_start, current_max))
return max_at_index

View File

@ -1,67 +0,0 @@
class MemoryManager(object):
def __init__(self, N=1024):
self._memory = [None for _ in range(1024)]
self._memory_size = N
self._last_block = self._memory_size - 1
self._allocations = []
def malloc(self, size):
next_startpoint = 0
minimum_size = None
minimum_start = None
for (allocation_start_point, allocation_size) in self._allocations:
current_block_size = allocation_start_point - next_startpoint
if (current_block_size > size):
if (minimum_size is None or minimum_size > current_block_size):
minimum_size = current_block_size
minimum_start = next_startpoint
next_startpoint = allocation_start_point + allocation_size - 1
current_block_size = self._memory_size - next_startpoint
if (current_block_size > size):
if (minimum_size is None or minimum_size > current_block_size):
minimum_size = current_block_size
minimum_start = next_startpoint
if minimum_start is None:
raise Exception("Could not allocate enough space")
for (index, (allocation_start, _)) in enumerate(self._allocations):
if allocation_start > minimum_start:
self._allocations.insert(index, (minimum_start, size))
break
else:
self._allocations.append((minimum_start, size))
return minimum_start
def free(self, pointer):
for (index, (start_point, _)) in enumerate(self._allocations):
if pointer == start_point:
self._allocations.pop(index)
break
else:
raise Exception("Unrecognized pointer")
if __name__ == '__main__':
mm = MemoryManager()
allocations = [mm.malloc(5) for _ in range(100)]
for allocation in allocations[:50]:
mm.free(allocation)
print("nice", mm.malloc(200))
print("cool", mm.malloc(400))
print(mm._allocations)
# a = mm.malloc(100)
# b = mm.malloc(500)
# print(a, b)
# print(mm._allocations)
# mm.free(a)
# print(mm._allocations)
# mm.free(b)
# print(mm._allocations)
# c = mm.malloc(1000)
# print(mm._allocations)

View File

@ -1,59 +0,0 @@
import heapq
class ListNode(object):
def __init__(self, x):
self.val = x
self.next = None
def merge_sorted_linked_lists(lists):
min_queue = []
result = ListNode(None)
current_node = result
for list_index, alist in enumerate(lists):
if alist is None:
continue
heap_value = (alist.val, list_index)
lists[list_index] = alist.next
heapq.heappush(min_queue, heap_value)
while min_queue:
value, list_index = heapq.heappop(min_queue)
selected_list = lists[list_index]
if selected_list is not None:
lists[list_index] = selected_list.next
heapq.heappush(min_queue, (selected_list.val, list_index))
new_node = ListNode(value)
current_node.next = new_node
current_node = new_node
return result.next
def make_linked_list(*values):
result = None
for value in values[::-1]:
new_node = ListNode(value)
new_node.next = result
result = new_node
return result
def linked_list_to_list(ll):
result = []
while ll:
result.append(ll.val)
ll = ll.next
return result
if __name__ == '__main__':
lists = [
make_linked_list(1, 10, 20, 21),
make_linked_list(16, 17, 18, 22),
make_linked_list(12, 13, 24),
make_linked_list(-1, 15, 24),
make_linked_list(-4),
None,
]
res = merge_sorted_linked_lists(lists)
print (linked_list_to_list(res))

View File

@ -1,49 +0,0 @@
#!/usr/bin/env python
import re
history_entry_re = re.compile(": ([0-9]*):[0-9]*;.*")
class dedup(set):
def contains_add(self, elem):
if elem in self:
return True
else:
self.add(elem)
return False
def merge_history(filenames):
entries = dedup()
pairs = [entry for filename in filenames for entry in entry_timestamp_pairs(filename)]
return "".join([
p[0] for p in sorted(pairs, key=lambda pair: pair[1])
if not entries.contains_add(p[0])
])
def entry_timestamp_pairs(filename):
pairs = []
with open(filename, 'r') as file:
entry = None
timestamp = None
for line in file.readlines():
if line[0] == ":":
if entry != None:
pairs.append((entry, timestamp))
timestamp = timestamp_from_line(line)
entry = line
else:
entry += line
return pairs
def timestamp_from_line(line):
return int(history_entry_re.search(line).group(1))
if __name__ == '__main__':
import sys
print merge_history(sys.argv[1:])

View File

@ -1,261 +0,0 @@
import collections
import random
import pprint
do_print = False
def maybe_print(*args):
if do_print:
print(*args)
def falling_factorial(n, k):
current = n
product = 1
for _ in range(k):
product *= current
current -= 1
def one_hand_with_lead_odds(card, num_players, is_trump=False, revealed_card=6, revealed_in_suit=False):
num_stronger_cards = 14 - card
if is_trump:
if card < revealed_card:
num_stronger_cards -= 1
elif is_trump is None:
if revealed_in_suit:
if card < revealed_card:
num_stronger_cards -= 1
else:
num_stronger_cards += 12
odds_no_stronger_card_out = 1.0
num_cards_remaining = 50
num_weaker_cards_remaining = num_cards_remaining - num_stronger_cards
for i in range(num_players-1):
odds_no_stronger_card_out *= num_weaker_cards_remaining/num_cards_remaining
num_weaker_cards_remaining -= 1
num_cards_remaining -= 1
return odds_no_stronger_card_out
def expected_value_of_one_bid(*args, **kwargs):
win_prob = one_hand_with_lead_odds(*args, **kwargs)
one_ev = win_prob + ((1 - win_prob) * -1)
zero_ev = -win_prob
return (one_ev - zero_ev, win_prob, one_ev, zero_ev)
def odds_for_number_of_players(num_players, is_trump=False):
for card in range(2, 15):
print(card)
print(expected_value_of_one_bid(card, num_players, is_trump=is_trump))
def random_permutation_of_size(n):
remaining = list(range(n))
for i in range(n-1, -1, -1):
yield remaining.pop(random.randint(0, i))
def random_permutation(the_list):
for index in random_permutation_of_size(len(the_list)):
yield the_list[index]
deck_of_cards = [
(number, suit)
for number in range(2, 15)
for suit in range(4)
]
def compare_cards(card1, card2, trump_suit, led_suit):
card1value, card1suit = card1
card2value, card2suit = card2
if card1suit == card2suit and card1suit in (trump_suit, led_suit):
return card1value - card2value
if card1suit == trump_suit:
return 1
if card2suit == trump_suit:
return -1
if card1suit == led_suit:
return 1
if card2suit == led_suit:
return -1
return 0
lost_hands = collections.defaultdict(int)
won_hands = collections.defaultdict(int)
performance_by_player_card = collections.defaultdict(
lambda: collections.defaultdict(lambda: collections.defaultdict(lambda: collections.defaultdict(int)))
)
def play_hand(player_order, player_to_strategy, player_scores, score_first_only=True):
bids = {}
shuffled_deck = list(random_permutation(deck_of_cards))
trump_card = shuffled_deck[len(player_order)]
trump_suit = None if trump_card[0] in [11, 12, 13] else trump_card[1]
led_suit = shuffled_deck[0][1]
highest_card = (0, led_suit)
winning_player = player_order[0]
maybe_print("Trump is {}, {} leads".format(trump_suit, player_order[0]))
if (trump_suit is not None and shuffled_deck[0][1] != trump_suit and
shuffled_deck[0][0] <= 8 and shuffled_deck[0][0] > 5):
# import ipdb; ipdb.set_trace()
pass
for player, card in zip(player_order, shuffled_deck):
if compare_cards(card, highest_card, trump_suit, led_suit) > 0:
highest_card = card
winning_player = player
bids[player] = player_to_strategy[player](card, trump_card, bids, len(player_order))
maybe_print("{} got {} and bid {}".format(player, card, bids[player]))
maybe_print("{} won the hand with {}".format(winning_player, highest_card))
for player in player_order:
bid = bids[player]
score = 0
if winning_player == player:
if bid == 1:
score = 1
won_hands[player] += 1
else:
score = -1
lost_hands[player] += 1
else:
if bid != 0:
score = -1
lost_hands[player] += 1
player_scores[player] += score
trump_of_card = None if trump_suit is None else shuffled_deck[0][1] == trump_suit
performance_dict = performance_by_player_card[player][trump_of_card][shuffled_deck[0][0]]
performance_dict[score] += 1
performance_dict["total"] += score
if score_first_only:
break
def optimal_strategy(card, trump_card, bids, number_of_players):
trump_suit = None if trump_card[0] in [11, 12, 13] else trump_card[1]
card_is_trump = card[1] == trump_card[1]
if trump_suit == None:
card_is_trump = None
if len(bids) == 0:
odds = expected_value_of_one_bid(
card[0], number_of_players, is_trump=card_is_trump,
revealed_card=trump_card[0], revealed_in_suit=card[1] == trump_card[1]
)
return 1 if odds[0] > 0 else 0
if card_is_trump:
return handle_non_first_bid(card, trump_card, bids, number_of_players)
else:
return 0
def handle_non_first_bid(card, trump_card, bids, number_of_players):
count_greater = 14 - card[0]
count_smaller = card[0] - 2
if trump_card[0] < card[0]:
count_smaller -= 1
else:
count_greater -= 1
odds_of_random_trump_smaller = float(count_smaller) / (count_smaller + count_greater)
bid_sum = sum(bids.values())
if bid_sum == 0:
return 1
elif bid_sum == 1 and odds_of_random_trump_smaller > (float(1)/3):
return 1
elif bid_sum == 2 and count_greater <= 3:
return 1
elif count_greater <= 1:
return 1
return 0
def mom_strat(card, trump_card, bids, number_of_players):
trump_suit = None if trump_card[0] in [11, 12, 13] else trump_card[1]
card_is_trump = card[1] == trump_card[1]
if trump_suit == None:
card_is_trump = None
if len(bids) > 0:
if card_is_trump:
return handle_non_first_bid(card, trump_card, bids, number_of_players)
return 0
if trump_suit == None:
if card[0] >= 5:
return 1
if card_is_trump:
return 1
if card[0] >= 13:
return 1
return 0
player_strategies = {
"optimal": optimal_strategy,
"mom1": mom_strat,
"mom2": mom_strat,
"mom3": mom_strat,
}
def simulate_hands(strats, number_of_hands):
player_order = list(player_strategies.keys())
scores = collections.defaultdict(int)
for _ in range(number_of_hands):
play_hand(player_order, strats, scores)
player_order = player_order[-1:] + player_order[:-1]
print(scores)
return scores
odds_for_number_of_players(4, is_trump=False)
print("score")
scores = simulate_hands(player_strategies, 100000)
highest_score = -10000000
winner = None
for player, score in scores.items():
if score > highest_score:
highest_score = score
winner = player
print("{} won with {}".format(winner, highest_score))
print("won")
print(won_hands)
print("lost")
print(lost_hands)
# for player, by_trump in performance_by_player_card.items():
# print(player)
# for trump_type, card_to_score in by_trump.items():
# print("{}: {}".format(trump_type, sum(card_to_score.values())))
pprint.pprint(performance_by_player_card["optimal"][None])
pprint.pprint(performance_by_player_card["mom1"][None])

View File

@ -1,20 +0,0 @@
from backports.functools_lru_cache import lru_cache
def parenthesizations(pairs):
parenthesizations = _parenthesizations(pairs * 2, net=0)
return parenthesizations
@lru_cache(maxsize=None)
def _parenthesizations(length, net=0):
if net == length:
return [')' * length]
res = prepend('(', _parenthesizations(length-1, net=net + 1))
if net > 0:
res.extend(prepend(')', _parenthesizations(length-1, net=net - 1)))
return res
def prepend(char, items):
return [char + item for item in items]

View File

@ -1,31 +0,0 @@
from backports.functools_lru_cache import lru_cache
def parenthesizations2h(pair_count):
return [
'{0:b}'.format(item).replace('1', '(').replace('0', ')')
for item in parenthesizations.parenthesizations(pair_count)
]
def parenthesizations2(pair_count):
"""Parenthesizations returned encoded as numbers
"""
parenthesizations = _parenthesizations2(pair_count * 2, net=0)
return parenthesizations
@lru_cache(maxsize=None)
def _parenthesizations2(length, net=0):
if net > length or net < 0:
raise Exception()
if net == length:
return [0]
res = add_bit(length-1, _parenthesizations(length-1, net=net + 1))
if net > 0:
res.extend(_parenthesizations(length-1, net=net - 1))
return res
def add_bit(bitindex, items):
value = 2 ** bitindex
return [value + item for item in items]

View File

@ -1,8 +0,0 @@
#!/usr/bin/env python
import parse_go_testify_not_equal
import sys
import json
if __name__ == '__main__':
actual, expected = parse_go_testify_not_equal.get_strings(sys.stdin.read())
print json.dumps({"actual": actual, "expected": expected})

View File

@ -1,29 +0,0 @@
#!/usr/bin/env python
import re
import sys
import tempfile
from subprocess import call
expected_re = re.compile("Error:\\s*Not equal:\\s*(.*?)\\s*\\(expected\\)")
actual_re = re.compile("!=\\s*(.*?)\\s*\(actual\)")
def get_strings(incoming):
expected_match = expected_re.search(incoming)
actual_match = actual_re.search(incoming)
return (eval(expected_match.group(1)), eval(actual_match.group(1)))
if __name__ == '__main__':
stdin = sys.stdin.read()
_, expected_filename = tempfile.mkstemp()
_, actual_filename = tempfile.mkstemp()
expected_text, actual_text = get_strings(stdin)
with open(expected_filename, 'w') as expected_file:
expected_file.write(expected_text)
with open(actual_filename, 'w') as actual_file:
actual_file.write(actual_text)
call(["icdiff", expected_filename, actual_filename, "--show-all-spaces"])

View File

@ -1,175 +0,0 @@
import itertools
from fractions import Fraction
class PlaneGroup(object):
FUEL_CAPACITY = Fraction(1, 1)
def __init__(
self, quantity=1, fuel=None, position=Fraction(0, 1),
returning=False
):
self.quantity = quantity
self.fuel = fuel or quantity * Fraction(1, 1)
self.position = position
self.returning = returning
def split(self, quantity=1, fuel=Fraction(0, 0)):
new = type(self)(quantity, fuel, self.position, self.returning)
remaining = type(self)(
quantity=self.quantity - quantity, fuel=self.fuel - fuel,
position=self.position, returning=self.returning
)
return new, remaining
def split_to_next_max(self):
desired_remaining_fuel = (self.quantity - 1) * self.FUEL_CAPACITY
turnback_fuel = self.fuel - desired_remaining_fuel
return self.split(fuel=turnback_fuel)
def join(self, other):
assert self.position == other.position
return type(self)(
quantity=self.quantity + other.quantity, fuel=self.fuel + other.fuel,
position=self.position, returning=other.returning,
)
@property
def effective_position(self):
if self.returning:
return self.position * -1
@property
def effective_fuel(self):
if self.returning:
return (self.position + self.FUEL_CAPACITY) * self.quantity
def distance_to_next_turnback(self, support=None):
desired_remaining_fuel = (self.quantity - 1) * self.FUEL_CAPACITY
if self.fuel < desired_remaining_fuel:
return Fraction(0, 1)
if support is None:
return (
(self.fuel - self.position - desired_remaining_fuel)
/
(self.quantity + 1)
)
fuel_needed_for_turnback = (self.position - support.effective_position)/2
target_fuel_at_turnback = (
desired_remaining_fuel + fuel_needed_for_turnback
)
turnback_to_get_to_support = (self.fuel - target_fuel_at_turnback) / self.quantity
return turnback_to_get_to_support
def _advance_to_refuel(self, exact=True):
fuel_needed = self._fuel_needed_for_distance(self.position)
if exact and fuel_needed != self.fuel:
raise Exception("Inexact refuel")
return type(self)(
quantity=self.quantity, fuel=self.quantity*self.FUEL_CAPACITY
)
def _fuel_needed_for_distance(self, distance):
fuel_needed = distance * self.quantity
if self.fuel >= fuel_needed:
return self.fuel - fuel_needed
raise Exception("Not enough fuel")
def advance(self, distance):
if self.returning:
if distance < self.position:
new_fuel = self.fuel - self._fuel_needed_for_distance(distance)
new_position = self.position - distance
return type(self)(
fuel=new_fuel, position=new_position, returning=True,
quantity=self.quantity,
)
else:
return self._advance_to_refuel().advance(distance-self.position)
new_fuel = self.fuel - self._fuel_needed_for_distance(distance)
new_position = self.position + distance
return type(self)(
fuel=new_fuel, position=new_position, quantity=self.quantity,
)
def run_simulation(plane_count):
return SimulationState(PlaneGroup(quantity=plane_count), []).simulate()
class SimulationState(object):
def __init__(self, destination_group, support_groups):
self.destination_group = destination_group
self.support_groups = support_groups
@property
def supports_by_advancement(self):
return sorted(
self.support_groups, key=lambda group: group.effective_position,
reverse=True,
)
@property
def returning_supports(self):
return (support for support in self.support_groups if support.returning)
@property
def nearest_support(self):
s = self.supports_by_advancement
if s:
return s[0]
@property
def min_time_to_support_collision(self):
min_time_to_collision = float('inf')
for returning in self.returning_supports:
for support in self.support_groups:
distance_between_supports = returning.position - support.effective_position
time_to_collision = distance_between_supports/2
meeting_position = time_to_collision + support.effective_position
if meeting_position > 0:
if time_to_collision < min_time_to_collision:
min_time_to_collision = time_to_collision
return min_time_to_collision
@property
def min_turnback_time(self):
min_time_to_turnback = float('inf')
turnback_group = self.destination_group
for support in self.supports_by_advancement:
time_to_turnback = turnback_group.distance_to_next_turnback(support)
if min_time_to_turnback > time_to_turnback:
min_time_to_turnback = time_to_turnback
turnback_group = support
# Check the last group
time_to_turnback = turnback_group.distance_to_next_turnback(None)
if min_time_to_turnback > time_to_turnback:
min_time_to_turnback = time_to_turnback
return min_time_to_turnback
def _advance(self, distance):
return type(self)(
self.destination_group.advance(distance),
[support.advance(distance) for support in self.support_groups],
)
def simulate(self):
support_returns = [
support.position
for support in self.support_groups if support.returning
]
next_event = min(
support_returns + [self.min_time_to_support_collision, self.min_turnback_time]
)
self._advance(next_event)
nearest_support_return = min(support_returns)
return self._advance(nearest_support_return).simulate()
return self._advance(turnback_distance)._start_turnback()

View File

@ -1,63 +0,0 @@
def groupings(numbers):
if len(numbers) == 0:
raise StopIteration()
if len(numbers) == 1:
yield ((numbers[0],),)
raise StopIteration()
this_number = numbers[0]
this_tuple = (this_number,)
next_groupings = groupings(numbers[1:])
for grouping in next_groupings:
yield (this_tuple,) + grouping
yield (this_tuple + grouping[0],) + grouping[1:]
def plus_minus_n(numbers, n):
for grouping in groupings(numbers):
numbers = map(group_to_int, grouping)
pms = plus_minuses(len(numbers) - 1)
for pm in pms:
result = compute(numbers, pm)
string = generate_string(numbers, pm)
if result == n:
yield string
def generate_string(numbers, pms):
string = ''
for number in numbers:
string += '{0}'.format(number)
if pms:
string += '+' if pms[0] is PLUS else '-'
pms = pms[1:]
return string
def compute(numbers, plus_minus):
sum = numbers[0]
remaining = numbers[1:]
while remaining:
if plus_minus[0] == PLUS:
sum += remaining[0]
else:
sum -= remaining[0]
remaining = remaining[1:]
plus_minus = plus_minus[1:]
return sum
def group_to_int(group):
return int(''.join(map(str, group)))
PLUS = object()
MINUS = object()
def plus_minuses(n):
if n == 0:
yield ()
raise StopIteration()
for pm in plus_minuses(n-1):
yield (PLUS,) + pm
yield (MINUS,) + pm

View File

@ -1,29 +0,0 @@
import collections
import copy
def powerset(elems):
counts = collections.defaultdict(int)
for elem in elems:
counts[elem] += 1
return powerset_helper(counts.items())
def powerset_helper(elems):
last_generation = [[]]
for (elem, count) in elems:
next_generation = last_generation
for _ in range(count):
new_generation = []
for subset in last_generation:
new_subset = copy.copy(subset)
new_subset.append(elem)
new_generation.append(new_subset)
next_generation.extend(new_generation)
last_generation = new_generation
last_generation = next_generation
return last_generation
if __name__ == '__main__':
print(len(powerset(range(23))))

View File

@ -1,36 +0,0 @@
def quicksort(incoming, lower=0, upper=None):
if upper is None:
upper = len(incoming)
if upper - lower < 2:
return
low_swap = lower
high_swap = upper - 1
replacing = high_swap
pivot = incoming[high_swap]
high_swap -= 1
while True:
if replacing > low_swap:
candidate = incoming[low_swap]
if candidate > pivot:
incoming[replacing] = candidate
replacing = low_swap
if low_swap == high_swap:
break
low_swap += 1
else:
candidate = incoming[high_swap]
if candidate < pivot:
incoming[replacing] = candidate
replacing = high_swap
if low_swap == high_swap:
break
high_swap -= 1
incoming[replacing] = pivot
quicksort(incoming, lower=lower, upper=replacing)
quicksort(incoming, lower=replacing+1, upper=upper)
if __name__ == '__main__':
my_list = [3, 20, 2, 52, 44, 16, 24, 5, 12, 4, 1, 14, 60, 29, 33, 1]
quicksort(my_list)
print(my_list)

View File

@ -1,84 +0,0 @@
def rotate_array(incoming, rotation_index):
new_back = incoming[:rotation_index]
new_front = incoming[rotation_index:]
new_front.extend(new_back)
return new_front
def binary_search(
array, item, low=0, high=None,
lower_predicate=lambda item, array, index, low, high: item <= array[index]
):
if low < 0:
raise ValueError('lo must be non-negative')
if high is None:
high = len(array)
while low < high:
mid = (low + high)//2
if lower_predicate(item, array, mid, low, high):
high = mid
else:
low = mid + 1
return low
class RotatedArrayProxy(object):
def __init__(self, incoming):
self.incoming = incoming
self._rotation_index = None
if incoming:
# Duplicates can not span the rotation
assert incoming[0] != incoming[-1]
def __getitem__(self, item):
if not isinstance(item, slice):
return self.incoming[self._actual_index(item)]
else:
self._handle_slice(item)
def _actual_index(self, index):
if index is None:
return index
elif 0 <= index < len(self.incoming):
return (index + self.rotation_index) % len(self.incoming)
elif index == len(self.incoming):
return self.rotation_index
else:
raise Exception()
@property
def rotation_index(self):
if self._rotation_index is None:
self._rotation_index = self._find_rotation_index()
return self._rotation_index
def _find_lower_predicate(self, item, array, index, low, high):
return array[0] > array[index]
def _find_rotation_index(self):
if len(self.incoming) < 1:
return 0
return binary_search(self.incoming, self.incoming[0],
lower_predicate=self._find_lower_predicate)
def __len__(self):
return len(self.incoming)
def sorted_insertion_index(self, x):
return binary_search(self, x)
def actual_insertion_index(self, x):
return self._actual_index(self.sorted_insertion_index(x))
def unrotated(self):
return rotate_array(self.incoming, self.rotation_index)
def insert(self, x):
insertion_index = self.actual_insertion_index(x)
if insertion_index < self.rotation_index or (
insertion_index == self.rotation_index and
(not self.incoming or x < self.incoming[0])
):
self._rotation_index += 1
self.incoming.insert(self.actual_insertion_index(x), x)

View File

@ -1,80 +0,0 @@
import rotated_array
# duplicates, slicing with stride, insertion index for item greater than everything for completely sorted array
def test_empty_rotated_array_proxy():
empty_rap = rotated_array.RotatedArrayProxy([])
assert empty_rap.rotation_index == 0
assert empty_rap.unrotated() == []
assert empty_rap.sorted_insertion_index(100) == 0
assert empty_rap.actual_insertion_index(100) == 0
def test_inserting_at_end_of_insertion_range():
rap = rotated_array.RotatedArrayProxy([3, 4, 5, 0, 2])
assert rap.rotation_index == 3
assert rap.unrotated() == [0, 2, 3, 4, 5]
assert rap.sorted_insertion_index(-1) == 0
assert rap.actual_insertion_index(-1) == 3
assert rap.sorted_insertion_index(1) == 1
assert rap.actual_insertion_index(1) == 4
assert rap.sorted_insertion_index(2) in [1, 2]
assert rap.actual_insertion_index(2) in [4, 5]
assert rap.sorted_insertion_index(3) in [2, 3]
assert rap.actual_insertion_index(3) in [0, 1]
def test_inserting_for_sorted_array():
rap = rotated_array.RotatedArrayProxy([0, 1])
assert rap.unrotated() == [0, 1]
assert rap.sorted_insertion_index(1000) == 2
assert rap.actual_insertion_index(1000) == 2
def test_inserting_largest_element():
rap = rotated_array.RotatedArrayProxy([3, 0, 1])
assert rap.rotation_index == 1
assert rap.sorted_insertion_index(1000) == 3
assert rap.actual_insertion_index(1000) == 1
def test_inserting_largest_element():
rap = rotated_array.RotatedArrayProxy([3, 0, 1])
assert rap.unrotated() == [0, 1, 3]
assert rap.actual_insertion_index(2) == 0
assert rap.actual_insertion_index(1) in [2, 3]
def test_rotation_index_and_unrotate():
arr = [3]*117 + [1] + [2]*16
rap = rotated_array.RotatedArrayProxy(arr)
assert rap[0] == 1
assert rap.rotation_index == 117
assert rap.unrotated() == sorted(arr)
arr = [3, 3, 3, 3, 1, 1, 1, 2, 2]
rap = rotated_array.RotatedArrayProxy(arr)
assert rap.rotation_index == 4
assert rap.unrotated() == sorted(arr)
rap = rotated_array.RotatedArrayProxy([3, 3, 3, 3, 1, 1, 1, 2])
assert rap.rotation_index == 4
assert rap.unrotated() == [1, 1, 1, 2, 3, 3, 3, 3]
def test_insert():
arr = [3]*117 + [1] + [2]*16
rap = rotated_array.RotatedArrayProxy(arr)
rap.insert(3)
rap.insert(3)
rap.insert(2)
rap.insert(2)
rap.insert(5)
rap.insert(24)
rap.insert(5)
rap.insert(4)
rap.insert(4)
assert rap.unrotated() == sorted(rap.incoming)

View File

@ -1,56 +0,0 @@
#!/usr/bin/env python
from optparse import OptionParser
from subprocess import Popen, PIPE
import select
import sys
import time
class IntervallicCommandRunner(object):
def __init__(self, command, command_interval, sleep_time=1):
self.command = command
self.command_interval = command_interval
self.sleep_time = sleep_time
self.last_time = None
self.read_last_time = True
@property
def can_read_from_stdin(self):
return sys.stdin in select.select([sys.stdin], [], [], 0)[0]
def _accumulate_input(self):
time_to_stop_after = self.last_time + self.command_interval
lines = []
new_time = time.time()
while new_time < time_to_stop_after:
for _ in range(10):
if self.can_read_from_stdin:
lines.append(sys.stdin.readline())
break
else:
time.sleep(self.sleep_time)
new_time = time.time()
self.last_time = new_time
return ''.join(lines)
def loop_indefinitely(self):
self.last_time = time.time()
while True:
Popen([self.command], shell=True, stdin=PIPE).communicate(
self._accumulate_input()
)
if __name__ == '__main__':
parser = OptionParser()
parser.add_option('-i', '--command-interval', dest="command_interval",
action="store", type="float", default=1.0)
parser.add_option('-c', '--command', dest="command", action="store",
default='cat')
options, _ = parser.parse_args()
IntervallicCommandRunner(options.command,
options.command_interval).loop_indefinitely()

View File

@ -1,31 +0,0 @@
#! /usr/bin/env python
import sys
def score_parentheses(input_string, index=0):
if index >= len(input_string):
return (0, index)
if input_string[index] == '(':
index += 1
else:
raise Exception("Invalid parentheses")
children_score, index = score_children(input_string, index)
if input_string[index] == ')':
index += 1
else:
raise Exception("Invalid parentheses")
return (children_score * 2 if children_score > 0 else 1, index)
def score_children(input_string, index=0):
input_length = len(input_string)
children_score = 0
while index < input_length and input_string[index] == '(':
child_score, index = score_parentheses(input_string, index)
children_score += child_score
return (children_score, index)
if __name__ == '__main__':
print (score_children(sys.argv[1]))

View File

@ -1,13 +0,0 @@
def segment(iterable, segment_length):
if segment_length is None:
yield iterable
raise StopIteration
def yield_length():
for _ in xrange(segment_length):
yield iterable.next()
while True:
segment = list(yield_length())
if not segment:
raise StopIteration
yield segment

View File

@ -1,128 +0,0 @@
#!/usr/bin/env python
import argparse
import os
class PathList(object):
@classmethod
def from_string(cls, path_string, separator=':'):
non_empty_paths = [path for path in path_string.split(separator) if path]
return cls(non_empty_paths, separator=':')
def __init__(self, paths, separator=':'):
self.paths = paths
self.separator = separator
def __str__(self):
return self.with_separator(self.separator)
def with_separator(self, separator=None):
separator = separator or self.separator
deduped = []
included = set()
for path in self.paths:
normalized = os.path.normpath(path)
if normalized not in included:
included.add(normalized)
deduped.append(path)
return separator.join(
os.path.normpath(path) for path in deduped
)
def add(self, new_paths, after=False, target=None):
# Remove the path if it already exists in self.paths to ensure
# that the new placement takes precedence
for path in new_paths:
done = False
while not done:
try:
self.paths.remove(path)
except:
done = True
if target:
target_index = self.paths.index(target)
else:
target_index = 0
if after:
increment = 1 if target else len(self.paths)
target_index += increment
self.paths = self.paths[:target_index] + new_paths + self.paths[target_index:]
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Manipulate path variables')
parser.add_argument(
'paths',
metavar='PATH',
type=str,
nargs='*',
help='paths to add',
)
parser.add_argument(
'--path-var', '-v',
help='the path var to add to.',
default='PATH',
)
parser.add_argument(
'--separator', '-s',
help='the separator of the path variable',
default=':',
)
parser.add_argument(
'--path-string',
help='the path string to edit',
default=None,
)
parser.add_argument(
'--after', '-a',
help=('whether to do the action after the target (if target is specified)'
'or the entire path variable'),
action='store_true',
default=True,
)
parser.add_argument(
'--before', '-b',
help='inverse of after',
dest='after',
action='store_false',
)
parser.add_argument(
'--target', '-t',
help='the target path',
default=None
)
parser.add_argument(
'--include-assignment', '-x',
action='store_true',
help='include the assignment command in output',
)
parser.add_argument(
'--print-separator', '-p',
help='separator to use for output',
default=None,
)
parser.add_argument(
'--path-lines', '-l',
help='use newlines to separate path output',
action='store_true',
default=False,
)
args = parser.parse_args()
path_string = args.path_string or os.environ.get(args.path_var, '')
path_list = PathList.from_string(path_string, separator=args.separator)
path_list.add(args.paths, after=args.after, target=args.target)
output_separator = '\n' if args.path_lines else args.print_separator
output_path = path_list.with_separator(separator=output_separator)
if args.include_assignment:
output = "export {}='{}'".format(args.path_var, output_path)
else:
output = output_path
print(output, end='')

View File

@ -1,24 +0,0 @@
def singleton(klass):
original_init = klass.__dict__.get('__init__')
class klassMeta(type, klass):
def __init__(self, *args):
original_init(self)
super(klass, self).__init__(*args)
class Temp(object):
__metaclass__ = klassMeta
Temp.__name__ = klass.__name__
klassMeta.__name__ = "{}Meta".format(klass.__name__)
return Temp
@singleton
class TestSingleton(object):
def __init__(self):
self.a = 22
self.b = 44
def hey(self):
return self.a + self.b

View File

@ -1,15 +0,0 @@
from . import singleton
def test_singleton():
@singleton.singleton
class TestSingleton(object):
def __init__(self):
self.a = 22
self.b = 44
def hey(self):
return self.a + self.b
assert TestSingleton.a == 22
assert TestSingleton.hey() == 66

View File

@ -1,49 +0,0 @@
import heapq
def get_skyline(buildings):
result = []
active_buildings = []
last_index = -1
def add_entry(index, height):
last_height = result[-1][1] if result else 0
if height != last_height:
result.append([index, height])
def handle_next_active_building():
(negative_height, end_index) = heapq.heappop(active_buildings)
while active_buildings and active_buildings[0][1] <= end_index:
heapq.heappop(active_buildings)
new_height = -active_buildings[0][0] if active_buildings else 0
add_entry(end_index, new_height)
def add_entry_for_last():
if active_buildings:
add_entry(last_index, -active_buildings[0][0])
for (left_index, right_index, height) in buildings:
if last_index > -1 and left_index != last_index:
# We have to do this here inside this if statement to handle the
# case where multiple building (potentially having different
# heights) start on the same index.
add_entry_for_last()
while active_buildings and active_buildings[0][1] < left_index:
handle_next_active_building()
heapq.heappush(active_buildings, (-height, right_index))
last_index = left_index
add_entry_for_last()
while active_buildings:
handle_next_active_building()
return result
if __name__ == '__main__':
print(get_skyline([[1,9,10],[1,7,15],[5,12,12],[15,20,10],[19,24,8], [26, 100, 100]]))

View File

@ -1,222 +0,0 @@
#!/usr/bin/env python
class SlashGrid(object):
def __init__(self, slash_array):
self.slash_array = slash_array
self.nodes = [
[Node(row_index, column_index, self) for column_index, value in enumerate(row)]
for row_index, row in enumerate(slash_array)
]
@property
def width(self):
return len(self.nodes[0])
@property
def height(self):
return len(self.nodes)
def run(self, in_between_searches=lambda x: x):
count = 0
for node_row in self.nodes:
for node in node_row:
if not node.top_visited:
in_between_searches(self)
count += 1
node.search('top', tag=count)
if not node.bottom_visited:
in_between_searches(self)
count += 1
node.search('bottom', tag=count)
in_between_searches(self)
return count
def __str__(self):
return '\n'.join(self.grid_row_string(row) for row in self.nodes)
def grid_row_string(self, row):
node_strings = [
node.string_lines for node in row
]
return '\n'.join(''.join(string_collection) for string_collection in zip(*node_strings))
class OutOfBoundsError(Exception):
pass
class Node(object):
_right_top = ('right', 'top')
_right_bottom = ('right', 'bottom')
_left_top = ('left', 'top')
_left_bottom = ('left', 'bottom')
_directions_map = {
True: (_left_top, _right_bottom),
False: (_left_bottom, _right_top)
}
_opposites = {
'left': 'right',
'right': 'left',
'top': 'bottom',
'bottom': 'top'
}
def __init__(self, row_index, column_index, grid):
self.row_index = row_index
self.column_index = column_index
self.grid = grid
self.top_visited = False
self.bottom_visited = False
@property
def string_lines(self):
if self.forward:
return [''.join([self.string_for_visited(self.top_visited), '/']),
''.join(['/', self.string_for_visited(self.bottom_visited)])]
else:
return [''.join(['\\', self.string_for_visited(self.top_visited)]),
''.join([self.string_for_visited(self.bottom_visited), '\\'])]
@staticmethod
def string_for_visited(visited):
if visited is True:
return 'X'
elif visited is False:
return ' '
else:
string = str(visited)
if isinstance(visited, int):
return str(visited)
if len(string) > 1:
return '`'
else:
return string
@property
def forward(self):
return self.grid.slash_array[self.row_index][self.column_index]
def directions_from(self, edge):
for direction_pair in self._directions_map[self.forward]:
if edge in direction_pair:
return direction_pair
else:
raise Exception()
def opposite(self, edge):
return self._opposites[edge]
def edge_visited(self, edge):
return getattr(self, self.edge_visited_string(edge))
def edge_visited_string(self, edge):
return '{0}_visited'.format(edge)
def visit_edge(self, edge, tag):
was_unvisited = not self.edge_visited(edge)
if was_unvisited:
setattr(self, self.edge_visited_string(edge), tag)
return was_unvisited
def search(self, edge, tag=True):
was_unvisited = self.visit_edge(edge, tag)
if not was_unvisited:
return
directions = self.directions_from(edge)
for travel_edge in directions:
try:
getattr(self, travel_edge).search(self.opposite(travel_edge), tag=tag)
except OutOfBoundsError:
pass
@property
def left_visited(self):
if self.forward:
return self.top_visited
else:
return self.bottom_visited
@property
def right_visited(self):
if self.forward:
return self.bottom_visited
else:
return self.top_visited
@right_visited.setter
def right_visited(self, value):
if self.forward:
self.bottom_visited = value
else:
self.top_visited = value
@left_visited.setter
def left_visited(self, value):
if self.forward:
self.top_visited = value
else:
self.bottom_visited = value
@property
def left(self):
if self.column_index <= 0:
raise OutOfBoundsError()
return self.grid.nodes[self.row_index][self.column_index-1]
@property
def right(self):
if self.column_index > self.grid.width - 2:
raise OutOfBoundsError()
return self.grid.nodes[self.row_index][self.column_index + 1]
@property
def top(self):
if self.row_index <= 0:
raise OutOfBoundsError()
return self.grid.nodes[self.row_index - 1][self.column_index]
@property
def bottom(self):
if self.row_index > self.grid.height-2:
raise OutOfBoundsError()
return self.grid.nodes[self.row_index + 1][self.column_index]
if __name__ == '__main__':
def print_grid(grid):
print "X"*100
print grid
def square(size, in_between_searches=lambda x: x):
import random
sg = SlashGrid([[bool(random.randint(0, 1)) for _ in range(size)] for _ in range(size)])
size = sg.run(in_between_searches)
return size
# SlashGrid([
# [True, False, True],
# [False, True, True]
# ]).run(print_grid)
# SlashGrid([
# [True, True, True],
# [False, False, False],
# [True, False, True]
# ]).run(print_grid)
# SlashGrid([
# [True, True, True, False, True],
# [False, False, False, True, False],
# [True, False, True, False, False],
# [True, True, True, False, True],
# ]).run(print_grid)
# SlashGrid([
# [True, True, True, False, True, False],
# [False, False, True, False, False, False],
# [True, True, True, False, False, False],
# [True, False, False, True, True, True],
# [True, False, False, True, False, False],
# [True, True, True, False, True, False]
# ]).run(print_grid)
print square(5, in_between_searches=print_grid)

View File

@ -1,32 +0,0 @@
def exact_match(text_body, query):
for index in range(len(text_body) - len(query) + 1):
for query_index, query_character in enumerate(query):
if query_character != text_body[index+query_index]:
break
else:
return True
return False
def wildcard_match(text_body, query):
for index in range(len(text_body) - len(query) + 1):
for query_index, query_character in enumerate(query):
if (query_character != '.' and
query_character != text_body[index+query_index]):
break
else:
return True
return False
def match_maybe(text_body, query):
if query == "":
return True
if len(query) > 1 and query[1] == '?':
return ((len(text_body) > 0 and text_body[0] == query[0] and match_maybe(text_body[1:], query[2:]))
or match_maybe(text_body, query[2:]))
return text_body[0] == query[0] and match_maybe(text_body[1:], query[1:])
def match_maybe_all(text_body, query):
for starting_index in range(len(text_body)):
if match_maybe(text_body[starting_index:], query):
return True
return False

View File

@ -1,86 +0,0 @@
goodpuzzle = [
[1,2,3,4,5,6,7,8,9],
[4,5,6,7,8,9,1,2,3],
[7,8,9,1,2,3,4,5,6],
[2,3,4,5,6,7,8,9,1],
[5,6,7,8,9,1,2,3,4],
[8,9,1,2,3,4,5,6,7],
[3,4,5,6,7,8,9,1,2],
[6,7,8,9,1,2,3,4,5],
[9,1,2,3,4,5,6,7,8]
]
badpuzzle1 = [
[1,2,3,4,5,6,7,9,8],
[4,5,6,7,8,9,1,2,3],
[7,8,9,1,2,3,4,5,6],
[2,3,4,5,6,7,8,9,1],
[5,6,7,8,9,1,2,3,4],
[8,9,1,2,3,4,5,6,7],
[3,4,5,6,7,8,9,1,2],
[6,7,8,9,1,2,3,4,5],
[9,1,2,3,4,5,6,7,8]
]
badpuzzle2 = [
[1,2,3,4,5,6,7,2,9],
[4,5,6,7,8,9,1,8,3],
[7,8,9,1,2,3,4,5,6],
[2,3,4,5,6,7,8,9,1],
[5,6,7,8,9,1,2,3,4],
[8,9,1,2,3,4,5,6,7],
[3,4,5,6,7,8,9,1,2],
[6,7,8,9,1,2,3,4,5],
[9,1,2,3,4,5,6,7,8]
]
badpuzzle3 = [
[1,2,3,4,5,6,7,8,9],
[4,5,6,7,8,9,1,2,3],
[7,8,9,1,2,3,4,5,6],
[2,3,4,5,6,7,8,9,1],
[5,6,7,8,9,1,2,3,4],
[3,4,5,6,7,8,9,1,2],
[8,9,1,2,3,4,5,6,7],
[6,7,8,9,1,2,3,4,5],
[9,1,2,3,4,5,6,7,8]
]
one_to_nine = set(range(1, 10))
def is_valid_sudoku_puzzle(sudoku_grid):
for row in sudoku_grid:
if set(row) != one_to_nine:
return False
for i in range(9):
column = [sudoku_grid[j][i] for j in range(9)]
if set(column) != one_to_nine:
return False
for i in range(3):
for j in range(3):
subgrid_elements = get_subgrid_elements(i, j, sudoku_grid)
if set(subgrid_elements) != one_to_nine:
return False
return True
def get_subgrid_elements(subgrid_row, subgrid_column, sudoku_grid, subgrid_size=3):
subgrid_row_start = subgrid_row * subgrid_size
subgrid_column_start = subgrid_column * subgrid_size
subgrid_elements = []
for i in range(subgrid_row_start, subgrid_row_start + subgrid_size):
subgrid_elements += sudoku_grid[i][subgrid_column_start:subgrid_column_start+subgrid_size]
return subgrid_elements
print(is_valid_sudoku_puzzle(goodpuzzle))
print(is_valid_sudoku_puzzle(badpuzzle1))
print(is_valid_sudoku_puzzle(badpuzzle2))
print(is_valid_sudoku_puzzle(badpuzzle3))

View File

@ -1,27 +0,0 @@
import datetime
from invoke import task, run
@task
def histogram(ignore=''):
result = run('git rev-list --all')
date_to_adds = {}
date_to_deletes = {}
for sha in result.stdout.split('\n'):
result = run('git diff-excluding {1} {0}~1 {0} --numstat'.format(sha, ignore), hide=True)
added, deleted = get_total(result.stdout)
iso8601 = run('git log {0} --pretty=format:%ai -1'.format(sha), hide=True).stdout.strip()
commit_date = datetime.datetime.strptime(iso8601, "%Y-%m-%dT%H:%M:%S %z").date()
date_to_adds[commit_date] = date_to_adds.get(commit_date) + added
date_to_deletes[commit_date] = date_to_deletes.get(commit_date) + deleted
print date_to_adds
print date_to_deletes
def get_total(output):
try:
return sum(int(line.split()[0]) for line in output.split('\n')), sum(int(line.split()[1]) for line in output.split('\n'))
except:
import ipdb; ipdb.set_trace()

View File

@ -1,27 +0,0 @@
#!/usr/bin/env python
from tox._config import parseconfig
from yaml import dump
class TravisFromTox(object):
def __init__(self, tox_config):
self._tox_config = tox_config
def build_travis_dict(self):
return {
'language': 'python',
'install': ['pip install "tox>=1.8.0"'],
'script': 'tox',
'env': self._get_environment_variables()
}
def _get_environment_variables(self):
return ['TOXENV={0}'.format(env) for env in self._tox_config.envlist]
def build_travis_yaml(self):
return dump(self.build_travis_dict(), default_flow_style=False)
if __name__ == '__main__':
print TravisFromTox(parseconfig()).build_travis_yaml()

View File

@ -1,10 +0,0 @@
#!/usr/bin/env python
import sys
def unescape(string):
print(eval(string))
if __name__ == '__main__':
unescape(sys.stdin.read())

View File

@ -1,62 +0,0 @@
import asyncio
import random
import queue
import requests
import threading
class WebhookHandler(object):
def __init__(self, callback_uri='http://whatever'):
self.queue = queue.Queue()
self.callback_uri = callback_uri
def enqueue(self, webhook_request):
self.queue.put(webhook_request)
def webhook_worker(self):
while True:
callback_request = self.queue.get()
self.run_request(callback_request)
self.queue.task_done()
def flaky_request(self, callback_request):
random_value = random.random()
print(random_value)
if random_value > .9:
return 500
r = requests.get(self.callback_uri, params=callback_request)
return r.status_code
def run_request(self, request):
status_code = self.flaky_request(request)
if status_code != 200:
asyncio.run(self.do_retry(callback_request))
else:
print("made request")
async def do_retry(self, request, delay=1):
await asyncio.sleep(delay)
print("Retried request")
self.run_request(request)
if __name__ == '__main__':
handler = WebhookHandler(callback_uri="https://www.google.com/")
thread_count = 10
for _ in range(1000):
handler.enqueue({})
threads = []
for _ in range(thread_count):
thread = threading.Thread(target=handler.webhook_worker, daemon=False)
thread.start()
threads.append(thread)
for _ in range(1000):
handler.enqueue({})
loop = asyncio.get_event_loop()
try:
loop.run_forever()
finally:
loop.close()

View File

@ -1,193 +0,0 @@
import argparse
import logging
import re
import subprocess
import time
from lxml import etree
from xpath import dxpb, xpb
import log_util
log = logging.getLogger(__name__)
def get_stdout_from_command(command):
return subprocess.Popen(command, stdout=subprocess.PIPE).stdout.read()
def below_threshold_trigger(threshold):
return lambda status_info: int(status_info['RSSI']) < threshold
class Network(object):
def __init__(self, ssid, password,
should_switch=below_threshold_trigger(-68)):
self.ssid = ssid
self.password = password
self.should_switch = should_switch
@property
def login_command(self):
return ["networksetup", "-setairportnetwork", "en0",
self.ssid, self.password]
def login(self):
log.debug("Reponse from connect: {0}".format(
get_stdout_from_command(self.login_command)
))
class OSXXMLStatusRetriever(object):
def _get_status_xml(self):
return get_stdout_from_command(['airport', '-I', '--xml'])
def _get_status_tree(self):
return etree.fromstring(self._get_status_xml())
_signal_strength_key_xpb = xpb.dict.key.text_contains_("RSSI_CTL_LIST")
def get_status_dict(self):
status_tree = self._get_status_tree()
signal_strength_array = self._signal_strength_key_xpb.one_(status_tree).getnext()
signal_strengths = xpb.integer.text_.apply_(signal_strength_array)
return sum([int(ss) for ss in signal_strengths]) / len(signal_strengths)
__call__ = get_status_dict
class OSXStatusRetriever(object):
KEY_REMAP = {
'agrCtlRSSI': 'RSSI',
'maxRate': 'max_rate',
}
status_output_line_regex = re.compile("^([^\n]*?): ([^\n]*?)$")
def _get_status_text(self):
return get_stdout_from_command(['airport', '-I'])
@classmethod
def _remap_key(cls, key):
return cls.KEY_REMAP.get(key, key)
def get_status_dict(self):
return {self._remap_key(match.group(1).strip()): match.group(2)
for match in [self.status_output_line_regex.match(line.strip())
for line in self._get_status_text().split('\n')]
if match is not None}
__call__ = get_status_dict
class OSXSSIDToRSSI(object):
def _get_scan_xml(self):
return get_stdout_from_command(['airport', '--scan', '--xml'])
def _get_scan_tree(self):
xml = self._get_scan_xml()
for i in range(10):
if xml:
break
xml = self._get_scan_xml()
else:
Exception("Airport command did not provide output.")
return etree.fromstring(xml)
_network_xpb = dxpb.array.dict
_ssid_xpb = xpb.key.text_contains_("SSID_STR")
_rssi_xpb = xpb.key.text_contains_("RSSI")
def _network_elements(self):
return self._network_xpb.apply_(self._get_scan_tree())
def get(self):
network_elements = self._network_elements()
ssid_to_rssi = {}
for network_element in network_elements:
ssid = self._get_ssid(network_element)
rssi = self._get_rssi(network_element)
if ssid not in ssid_to_rssi or rssi > ssid_to_rssi[ssid]:
ssid_to_rssi[ssid] = rssi
return ssid_to_rssi
def _get_ssid(self, network_element):
try:
return self._ssid_xpb.one_(network_element).getnext().text
except:
return None
def _get_rssi(self, network_element):
try:
return int(self._rssi_xpb.one_(network_element).getnext().text)
except:
return 0
__call__ = get
class WiFiAutoSwitcher(object):
def __init__(self, networks, status_getter=OSXStatusRetriever(),
ssid_to_rssi_getter=OSXSSIDToRSSI()):
self._networks = {network.ssid: network for network in networks}
self._get_status = status_getter
self._ssid_to_rssi = ssid_to_rssi_getter
def switch_if_necessary(self):
status_dict = self._get_status()
log.debug(status_dict)
network = None
if 'SSID' in status_dict:
network = self._networks.get(status_dict['SSID'])
if network is None:
# Don't do anything if the current network is not recognized
return
if not network or network.should_switch(status_dict):
log.debug("Attempting to switch networks from {0}, ".format(
network.ssid if network else "(Not conneted to network)"
))
new_network = self.select_known_network_with_best_rssi()
if new_network:
if network and new_network.ssid == network.ssid:
log.debug("Switch triggered but connected network is still best.")
else:
new_network.login()
else:
log.debug("No switch deemed necessary.")
def select_known_network_with_best_rssi(self):
ssid_to_rssi = self._ssid_to_rssi()
log.debug("Selecting best network using: {0}".format(ssid_to_rssi))
network = max(
self._networks.values(),
key=lambda network: ssid_to_rssi.get(network.ssid, -1000000)
)
if network.ssid in ssid_to_rssi:
log.debug("selected: {0}".format(network.ssid))
return network
else:
log.debug("No matching networks were found.")
if __name__ == '__main__':
log_util.enable_logger(__name__)
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--network', nargs='+', type=str, action='append', dest='networks')
parser.add_argument('-s', '--sleep-time', type=int, default=4.0, dest='sleep_time')
args = parser.parse_args()
network_pairs = args.networks
for network_pair in network_pairs:
assert len(network_pair) == 2
auto_switcher = WiFiAutoSwitcher(
[Network(*ssid_password) for ssid_password in network_pairs]
)
while True:
time.sleep(args.sleep_time)
auto_switcher.switch_if_necessary()

View File

@ -1,172 +0,0 @@
from cached_property import cached_property
class XPathBuilder(object):
def __init__(self, nodes=(), relative=True, direct_child=False):
self.nodes = tuple(nodes)
self.relative = relative
self.direct_child = direct_child
@cached_property
def xpath(self):
return ('.' if self.relative else '') + ''.join(node.xpath
for node in self.nodes)
@property
def or_(self):
return self.update_final_node(self.nodes[-1].make_or)
@property
def text_(self):
return self.update_final_node(
self.nodes[-1](selected_attribute=XPathNode.text)
)
def add_node(self, **kwargs):
if 'direct_child' not in kwargs:
kwargs['direct_child'] = self.direct_child
return type(self)(self.nodes + (XPathNode(**kwargs),),
relative=self.relative)
def __getattr__(self, attr):
return self.add_node(element=attr)
def update_final_node(self, updated_final_node):
return type(self)(self.nodes[:-1] + (updated_final_node,),
relative=self.relative,
direct_child=self.direct_child)
def __call__(self, *predicates, **attributes):
direct_child = attributes.pop('direct_child', None)
assert len(self.nodes)
updated_final_node = self.nodes[-1](predicates=predicates,
attributes=attributes,
direct_child=direct_child)
return self.update_final_node(updated_final_node)
def attribute_contains(self, attribute, contains_string):
updated_final_node = self.nodes[-1].add_contains_predicates(
((attribute, contains_string),)
)
return self.update_final_node(updated_final_node)
def with_classes(self, *classes):
return self.update_final_node(self.nodes[-1].with_classes(classes))
def select_attribute_(self, attribute, elem=None):
update_final_node = self.nodes[-1](selected_attribute=attribute)
builder = self.update_final_node(update_final_node)
if elem is not None:
return builder.apply_(elem)
else:
return builder
def text_contains_(self, contained_text):
updated_final_node = self.nodes[-1].text_contains(contained_text)
return self.update_final_node(updated_final_node)
with_class = with_classes
def apply_(self, tree):
return tree.xpath(self.xpath)
def one_(self, tree):
return self.apply_(tree)[0]
def get_text_(self, tree):
return self.apply_(tree)[0].text_content()
def __repr__(self):
return '{0}("{1}")'.format(type(self).__name__, self.xpath)
class XPathNode(object):
text = object()
@staticmethod
def contains_class(class_attribute, contained_class):
return "contains(concat(' ',normalize-space(@{0}),' '),' {1} ')".\
format(class_attribute, contained_class)
@staticmethod
def contains_attribute(attribute, contained_string):
return "contains(@{0}, '{1}')".format(attribute, contained_string)
@staticmethod
def attribute_equal(attribute, value):
return "@{0} = '{1}'".format(attribute, value)
def __init__(self, element='*', attributes=None, predicates=None,
direct_child=False, use_or=False, selected_attribute=None):
self.element = element
self.predicates = tuple(predicates) if predicates else ()
if attributes:
self.predicates += tuple([self.attribute_equal(key, value)
for key, value in attributes.items()])
self.direct_child = direct_child
self.use_or = use_or
self.selected_attribute = selected_attribute
@property
def make_or(self):
return self(use_or=True)
@property
def separator(self):
return '/' if self.direct_child else '//'
@property
def xpath(self):
return '{0}{1}{2}{3}'.format(self.separator, self.element,
self.predicate_string,
self.selected_attribute_string)
@property
def predicate_joiner(self):
return ' or ' if self.use_or else ' and '
@property
def predicate_string(self):
if self.predicates:
predicate = self.predicate_joiner.join(self.predicates)
return '[ {0} ]'.format(predicate)
else:
return ''
@property
def selected_attribute_string(self):
if self.selected_attribute is self.text:
return '/text()'
return '/@{0}'.format(self.selected_attribute) \
if self.selected_attribute else ''
def __call__(self, element=None, predicates=(), attributes=None,
direct_child=None, use_or=False, selected_attribute=None):
direct_child = (self.direct_child
if direct_child is None
else direct_child)
element = self.element if element is None else element
new_predicates = self.predicates + tuple(predicates)
return type(self)(element, attributes, new_predicates,
direct_child, use_or, selected_attribute)
def with_classes(self, classes):
predicates = tuple(self.contains_class('class', contained_class)
for contained_class in classes)
return self(predicates=predicates)
def add_contains_predicates(self, kv_pairs):
predicates = tuple(self.contains_attribute(attribute, contains_string)
for attribute, contains_string in kv_pairs)
return self(predicates=predicates)
def text_contains(self, contained_text):
return self(predicates=("contains(text(),'{0}')".
format(contained_text),))
xpb = XPathBuilder()
dxpb = XPathBuilder(direct_child=True)