Click here for the first post, which contains the context of this series.
Item #51: Prefer class decorators over metaclasses.
Consider the following decorator:
from functools import wraps
def func_log(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
return result
except Exception as exception:
result = exception
raise
finally:
print(f'{func.__name__}({args},{kwargs})->{result}')
return wrapper
from functools import wraps
def func_log(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
return result
except Exception as exception:
result = exception
raise
finally:
print(f'{func.__name__}({args},{kwargs})->{result}')
return wrapper
Suppose that you want to use it to log a dictionary:
class FuncLogDict(dict):
@func_log
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@func_log
def __getitem__(self, *args, **kwargs):
super().__getitem__(*args, **kwargs)
@func_log
def __setitem__(self, *args, **kwargs):
super().__setitem__(*args, **kwargs)
# ...
d = FuncLogDict()
d['foo'] = 'bar'
d['foo']
class FuncLogDict(dict):
@func_log
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@func_log
def __getitem__(self, *args, **kwargs):
super().__getitem__(*args, **kwargs)
@func_log
def __setitem__(self, *args, **kwargs):
super().__setitem__(*args, **kwargs)
# ...
d = FuncLogDict()
d['foo'] = 'bar'
d['foo']
This is redundant. Use a class decorator instead:
import types
log_types = (
types.MethodType,
types.FunctionType,
types.BuiltinMethodType,
types.BuiltinFunctionType,
types.MethodDescriptorType,
types.ClassMethodDescriptorType
)
def class_log(instance):
for key in dir(instance):
value = getattr(instance, key)
if isinstance(value, log_types):
setattr(instance, key, func_log(value))
return instance
@class_log
class ClassLogDict(dict):
pass
d = ClassLogDict()
d['foo'] = 'bar'
d['foo']
import types
log_types = (
types.MethodType,
types.FunctionType,
types.BuiltinMethodType,
types.BuiltinFunctionType,
types.MethodDescriptorType,
types.ClassMethodDescriptorType
)
def class_log(instance):
for key in dir(instance):
value = getattr(instance, key)
if isinstance(value, log_types):
setattr(instance, key, func_log(value))
return instance
@class_log
class ClassLogDict(dict):
pass
d = ClassLogDict()
d['foo'] = 'bar'
d['foo']
Item #52: Use subprocess to manage child processes.
I skip this item since it depends heavily on the operating system on which Python is run but recommend perusing the documentation of subprocess and refreshing one's memory about pipes.
Item #53: Use threads for blocking I/O, avoid for parallelism.
Although the global interpreter lock (GIL) does not allow threads to run in parallel, they are useful for doing blocking I/O at the same time as computation.
from threading import Thread
class Factorize(Thread):
def __init__(self, number):
super().__init__()
self.number = number
def run(self):
self.factors = [1]
for i in range(2, self.number):
if not self.number % i:
self.factors.append(i)
self.factors.append(self.number)
threads = []
for number in [2139079, 1214759, 1516637, 1852285]:
thread = Factorize(number)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print(f'{thread.number}: {thread.factors}')
Item #54: Use Lock to prevent data races in threads.
Consider
from threading import Thread
class Counter:
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
def worker(counter, total):
for _ in range(total):
counter.increment()
total = 10 ** 5
counter = Counter()
threads = []
for _ in range(5):
thread = Thread(target=worker, args=(counter, total))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print('expected:', total * 5, 'actual:', counter.count)
A run of this code gave me the output:
expected: 500000 actual: 406246
This is due to a race condition. One way to address it is to use the Lock class, which is a mutex:
# ...
from threading import Lock
class Counter:
def __init__(self):
self.lock = Lock()
self.count = 0
def increment(self):
with self.lock:
self.count += 1
# ...
Item #55: Use Queue to coordinate work between threads.
Suppose that you want to do something (ideally I/O bound) that can be structured as a pipeline. You can use multiple threads to significantly speed it up, and you can use Queue to coordinate them. Here is an abstract example:
from queue import Queue
from threading import Thread
class MyQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item == self.SENTINEL:
return
yield item
finally:
self.task_done()
class MyWorker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
for item in self.in_queue:
self.out_queue.put(self.func(item))
def func_1(item):
return item
def func_2(item):
return item
def func_3(item):
return item
queue_1 = MyQueue()
queue_2 = MyQueue()
queue_3 = MyQueue()
queue_4 = MyQueue()
threads = [
MyWorker(func_1, queue_1, queue_2) for _ in range(10)
] + [
MyWorker(func_2, queue_2, queue_3) for _ in range(10)
] + [
MyWorker(func_3, queue_3, queue_4) for _ in range(10)
]
for thread in threads:
thread.start()
for i in range(100):
queue_1.put(i)
for queue in [queue_1, queue_2, queue_3]:
for _ in range(10):
queue.close()
queue.join()
for thread in threads:
thread.join()
print(queue_4.qsize())