Effective Python 51 - 55

Click here for the first post, which contains the context of this series.

Item #51: Prefer class decorators over metaclasses.

Consider the following decorator:

from functools import wraps
def func_log(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            result = func(*args, **kwargs)
            return result
        except Exception as exception:
            result = exception
            raise
        finally:
            print(f'{func.__name__}({args},{kwargs})->{result}')
    return wrapper

 
Suppose that you want to use it to log a dictionary:

class FuncLogDict(dict):
    @func_log
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
    @func_log
    def __getitem__(self, *args, **kwargs):
        super().__getitem__(*args, **kwargs)
    @func_log
    def __setitem__(self, *args, **kwargs):
        super().__setitem__(*args, **kwargs)
    # ...
d = FuncLogDict()
d['foo'] = 'bar'
d['foo']

 
This is redundant. Use a class decorator instead:

import types
log_types = (
    types.MethodType,
    types.FunctionType,
    types.BuiltinMethodType,
    types.BuiltinFunctionType,
    types.MethodDescriptorType,
    types.ClassMethodDescriptorType
)
def class_log(instance):
    for key in dir(instance):
        value = getattr(instance, key)
        if isinstance(value, log_types):
            setattr(instance, key, func_log(value))
    return instance
@class_log
class ClassLogDict(dict):
    pass
d = ClassLogDict()
d['foo'] = 'bar'
d['foo']

Item #52: Use subprocess to manage child processes.

I skip this item since it depends heavily on the operating system on which Python is run but recommend perusing the documentation of subprocess and refreshing one's memory about pipes.

Item #53: Use threads for blocking I/O, avoid for parallelism.

Although the global interpreter lock (GIL) does not allow threads to run in parallel, they are useful for doing blocking I/O at the same time as computation.

from threading import Thread
class Factorize(Thread):
    def __init__(self, number):
        super().__init__()
        self.number = number
    def run(self):
        self.factors = [1]
        for i in range(2, self.number):
            if not self.number % i:
                self.factors.append(i)
        self.factors.append(self.number)
threads = []
for number in [2139079, 1214759, 1516637, 1852285]:
    thread = Factorize(number)
    thread.start()
    threads.append(thread)
for thread in threads:
    thread.join()
    print(f'{thread.number}: {thread.factors}')

Item #54: Use Lock to prevent data races in threads.

Consider

from threading import Thread
class Counter:
    def __init__(self):
        self.count = 0
    def increment(self):
        self.count += 1
def worker(counter, total):
    for _ in range(total):
        counter.increment()
total = 10 ** 5
counter = Counter()
threads = []
for _ in range(5):
    thread = Thread(target=worker, args=(counter, total))
    thread.start()
    threads.append(thread)
for thread in threads:
    thread.join()
print('expected:', total * 5, 'actual:', counter.count)

A run of this code gave me the output:

expected: 500000 actual: 406246

This is due to a race condition. One way to address it is to use the Lock class, which is a mutex:

# ...
from threading import Lock
class Counter:
    def __init__(self):
        self.lock = Lock()
        self.count = 0
    def increment(self):
        with self.lock:
            self.count += 1
# ...

Item #55: Use Queue to coordinate work between threads.

Suppose that you want to do something (ideally I/O bound) that can be structured as a pipeline. You can use multiple threads to significantly speed it up, and you can use Queue to coordinate them. Here is an abstract example:

from queue import Queue
from threading import Thread
class MyQueue(Queue):
    SENTINEL = object()
    def close(self):
        self.put(self.SENTINEL)
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item == self.SENTINEL:
                    return
                yield item
            finally:
                self.task_done()
class MyWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
    def run(self):
        for item in self.in_queue:
            self.out_queue.put(self.func(item))
def func_1(item):
    return item
def func_2(item):
    return item
def func_3(item):
    return item
queue_1 = MyQueue()
queue_2 = MyQueue()
queue_3 = MyQueue()
queue_4 = MyQueue()
threads = [
    MyWorker(func_1, queue_1, queue_2) for _ in range(10)
] + [
    MyWorker(func_2, queue_2, queue_3) for _ in range(10)
] + [
    MyWorker(func_3, queue_3, queue_4) for _ in range(10)
]
for thread in threads:
    thread.start()
for i in range(100):
    queue_1.put(i)
for queue in [queue_1, queue_2, queue_3]:
    for _ in range(10):
        queue.close()
    queue.join()
for thread in threads:
    thread.join()
print(queue_4.qsize())

0 comments:

Post a Comment