Python. Потоки, процессы
Логирование
Логированием называют запись логов. Оно позволяет ответить на вопросы, что происходило, когда и при каких обстоятельствах. Без логов сложно понять, из-за чего появляется ошибка, если она возникает периодически и только при определенных условиях.
logging
import logging
logging . debug ( 'This is a debug message' )
logging . info ( 'This is an info message' )
logging . warning ( 'This is a warning message' )
logging . error ( 'This is an error message' )
logging . critical ( 'This is a critical message' )
Базовая натройка
level: Корневой логер с установленным указанным уровнем важности (severity).
filename: Указание файла логов
filemode: Режим открытия файла. По умолчанию это a, что означает добавление.
format: Формат сообщений.
Пример настройки
import logging
logging . basicConfig (
filename = 'app.log' ,
filemode = 'w' ,
format = ' %(name)s - %(levelname)s - %(message)s '
)
logging . warning ( 'This will get logged to a file' )
Что такое процесс?
Процесс — экземпляр программы во время выполнения, независимый объект, которому выделены системные ресурсы (например, процессорное время и память). Каждый процесс выполняется в отдельном адресном пространстве: один процесс не может получить доступ к переменным и структурам данных другого. Если процесс хочет получить доступ к чужим ресурсам, необходимо использовать межпроцессное взаимодействие. Это могут быть конвейеры, файлы, каналы связи между компьютерами и многое другое.
Что такое поток?
Поток — определенный способ выполнения процесса. Когда один поток изменяет ресурс процесса, это изменение сразу же становится видно другим потокам этого процесса.
Что такое форк?
1. Цель состоит в том, чтобы создать новый процесс, который становится дочерним процессом вызывающего объекта
2. Оба процесса выполнят следующую команду после системного вызова fork()
3. Две идентичные копии адресного пространства, кода и стека компьютера создаются по одной для родителя и ребенка.
Различие между процессами и потоками
И процессы, и потоки являются независимыми последовательностями выполнения. Типичное различие заключается в том, что потоки (одного и того же процесса) выполняются в общем пространстве памяти, в то время как процессы выполняются в отдельных пространствах памяти.
Что такое потоки?
Thread — это отдельный поток выполнения. Это означает, что в вашей программе могут работать две и более подпрограммы одновременно. Но разные потоки на самом деле не работают одновременно: это просто кажется.
Threading
Стандартная библиотека Python предоставляет библиотеку threading, которая содержит необходимые классы для работы с потоками. Основной класс в этой библиотеки Thread.
Чтобы запустить отдельный поток, нужно создать экземпляр потока Thread и затем запустить его с помощью метода .start()
Пример thread
import threading
import time
def thread_function ( name ):
print ( "Thread %s : starting" , name )
time . sleep ( 2 )
print ( "Thread %s : finishing" , name )
if __name__ == "__main__" :
x = threading . Thread ( target = thread_function , args =( 1 ,))
print ( "Main : before running thread" )
x . start ()
print ( "Main : wait for the thread to finish" )
print ( "Main : all done" )
Демоны потоков
В информатике daemon (демон) — это процесс, который работает в фоновом режиме.
Python потоки имеет особое значение для демонов. Демон потока (или как еще его можно назвать демонический поток) будет остановлен сразу после выхода из программы. Один из способов думать об этих определениях — считать демон потока как потоком, который работает в фоновом режиме, не беспокоясь о его завершении.
Пример процесса демона
x = threading . Thread ( target = thread_function , args =( 1 ,), deamon = True )
.join()
Чтобы указать одному потоку дождаться завершения другого потока, вам нужно вызывать .join()
Работа с несколькими потоками
До сих пор мы рассматривали пример только с двумя потоками: основным потоком и с потоком который мы создали с помощью объекта threading.Thread.
Зачастую вам нужно будет запускать несколько потоков. Давайте начнем с более сложного способа сделать это, а затем перейдем к более простому способу.
Пример
if __name__ == "__main__" :
format = " %(asctime)s : %(message)s "
logging . basicConfig ( format = format , level = logging . INFO ,
datefmt = "%H:%M:%S" )
threads = list ()
for index in range ( 3 ):
logging . info ( "Main : create and start thread %d ." , index )
x = threading . Thread ( target = thread_function , args =( index ,))
threads . append ( x )
x . start ()
for index , thread in enumerate ( threads ):
logging . info ( "Main : before joining thread %d ." , index )
thread .join()
logging . info ( "Main : thread %d done" , index )
Использование ThreadPoolExecutor
Код создает ThreadPoolExecutor как менеджер контекста, сообщая ему, сколько рабочих потоков он хочет в пуле. Затем он использует .map() для пошагового прохождения итерируемой объкта, в нашем случае range(3), передавая каждый поток в пул.
Пример
import concurrent . futures
if __name__ == "__main__" :
format = " %(asctime)s : %(message)s "
logging . basicConfig ( format = format , level = logging . INFO ,
datefmt = "%H:%M:%S" )
with concurrent . futures . ThreadPoolExecutor ( max_workers = 3 ) as executor :
executor . map ( thread_function , range ( 3 ))
Условия гонки
Условия гонки могут возникать, когда два или более потока обращаются к общему фрагменту данных или ресурсу. В следующим примере мы создадим состояние гонки, но имейте в виду, что часто условие гонки не так очевидны и они могут привести к сбивающим с толку результатам. Как вы можете себе представить, все это делает их довольно сложными для отладки.
Пример
Добавим глобальную переменную
Будем изменять её каждый раз, когда будем вызывать поток
Запустим этот код несколько раз
Классы от Thread
class CustomThread ( threading . Thread ):
def __init__ ( self , limit ):
threading . Thread . __init__ ( self )
self . _limit = limit
def run ( self ):
for i in range ( self . _limit ):
print ( f "from CustomThread: { i } " )
time . sleep ( 0.5 )
cth = CustomThread ( 3 )
cth . start ()
Принудительное завершение работы
lock = Lock()
lock.acquire() # Выполнит блокировку данного участка кода
... доступ к общим ресурсам
lock.release()
lock = Lock ()
stop_thread = False
def infinit_worker ():
print ( "Start infinit_worker()" )
while True :
print ( "-- thread work" )
lock . acquire ()
if stop_thread is True :
break
lock . release ()
sleep ( 0.1 )
print ( "Stop infinit_worker()" )
# Create and start thread
th = Thread ( target = infinit_worker )
th . start ()
sleep ( 2 )
# Stop thread
lock . acquire ()
stop_thread = True
lock . release ()
GIL (Global Interpreter Lock)
GIL — способ синхронизации потоков, который используется в некоторых интерпретируемых языках программирования, например в Python и Ruby.
Задача
Реализовать поиск максимального числа внутри подмассива, использую Thread
Работа с модулем multiprocessing
Модуль multiprocessing был добавлен в Python версии 2.6. Изначально он был определен в PEP 371 Джесси Ноллером и Ричардом Одкерком. Модуль multiprocessing позволяет вам создавать процессы таким же образом, как при создании потоков при помощи модуля threading. Суть в том, что, в связи с тем, что мы теперь создаем процессы, вы можете обойти GIL (Global Interpreter Lock) и воспользоваться возможностью использования нескольких процессоров на компьютере.
Пример программы
import os
from multiprocessing import Process
if __name__ == '__main__' :
numbers = [ 5 , 10 , 15 , 20 , 25 ]
procs = []
for index , number in enumerate ( numbers ):
proc = Process ( target = doubler , args =( number ,))
procs . append ( proc )
proc . start ()
for proc in procs :
proc .join()
Когда что использовать?
1. Потоки обмениваются данными по умолчанию; процессов нет.
2. Как следствие (1), отправка данных между процессами обычно требует травления и рассыпания.
3. Как еще одно следствие (1), прямое совместное использование данных между процессами обычно требует ввода его в низкоуровневые форматы типа Value, Array и ctypes .
4. Процессы не подпадают под действие GIL.
5. На некоторых платформах (в основном Windows) процессы намного дороже создавать и уничтожать.
6. Существуют некоторые дополнительные ограничения на процессы, некоторые из которых различны на разных платформах. Подробнее см. Руководство по программированию.
7. Модуль threading передачи не имеет некоторых функций multiprocessing модуля. (Вы можете использовать multiprocessing.dummy чтобы получить большую часть недостающего API поверх потоков, или вы можете использовать модули более высокого уровня, такие как concurrent.futures и не беспокоиться об этом.)
Вопросы к собеседованиям
Какие задачи хорошо параллелятся, какие плохо?
Нужно посчитать 100 уравнений. Делать это в тредах или нет?
Треды в Питоне — это нативные треды или нет?
Асинхронное программирование
Асинхронность в программировании — выполнение процесса в неблокирующем режиме системного вызова, что позволяет потоку программы продолжить обработку.
Асинхронный код убирает блокирующую операцию из основного потока программы, так что она продолжает выполняться, но где-то в другом месте, а обработчик может идти дальше. Проще говоря, главный "процесс" ставит задачу и передает ее другому независимому "процессу"
Работа синхронной и асинхронной программы
Асинхронное программирование в Python
Изначально в Python для решения задач асинхронного программирования использовались корутины, основанные на генераторах. Потом, в Python 3.4, появился модуль asyncio (иногда его название записывают как async IO), в котором реализованы механизмы асинхронного программирования. В Python 3.5 появилась конструкция async/await.
Для того чтобы заниматься асинхронной разработкой на Python, нужно разобраться с парой понятий. Это — корутины (coroutine) и задачи (task).
Корутины
Обычно корутина — это асинхронная (async) функция. Корутина может быть и объектом, возвращённым из корутины-функции.
Если при объявлении функции указано то, что она является асинхронной, то вызывать её можно с использованием ключевого слова await:
await say_after(1, ‘hello’)
Корутины через yield
def double ():
print ( ' Начало функции' )
value = 2 * ( yield )
print ( ' value = {}' . format (value))
yield value
print ( ' Конец функции' )
d = double()
next (d)
d.send( 21 )
d.send( 42 )
Задачи
Задачи позволяют запускать корутины в цикле событий. Это упрощает управление выполнением нескольких корутин. Вот пример, в котором используются корутины и задачи. Обратите внимание на то, что сущности, объявленные с помощью конструкции async def — это корутины. Этот пример взят из официальной документации Python.
import asyncio
import time loop = asyncio.create_event_loop() async def say_after ( delay , what ):
await asyncio.sleep(delay)
print (what) async def main ():
task1 = asyncio.create_task(
say_after( 1 , 'hello' )) task2 = asyncio.create_task(
say_after( 2 , 'world' )) print ( f "started at {time.strftime( '%X' )} " ) # Ждём завершения обеих задач (это должно занять
# около 2 секунд.)
await task1
await task2 print ( f "finished at {time.strftime( '%X' )} " )
asyncio.run(main())
Пример работы синхронного и асинхронного вызовов
Asynchronous:
Task 1 done
Task 4 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Task 3 done
Task 2 done
Task 5 done
Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Aiohttp
Самая популярная блокирующая задача — получение данных по HTTP-запросу. Рассмотрим работу с великолепной библиотекой aiohttp на примере получения информации о публичных событиях на GitHub.
async def aparse_github ():
async with aiohttp . ClientSession () as session :
async with session . get ( URL ) as response :
print ( "Status:" , response . status )
print ( "Content-type:" , response . headers [ 'content-type' ])
html = await response . text ()
print ( "Body:" , html [: 15 ], "..." )
async def main ():
start = time . time ()
tasks = [ asyncio . ensure_future (
aparse_github ()) for i in range ( 1 , MAX_CLIENTS + 1 )]
await asyncio . wait ( tasks )
print ( time . time () - start )
Задача
Реализовать получение данных пользователей с сайта https://jsonplaceholder.typicode.com/
Реализовать получение данных при помощи синхронных и асинхронных функций
Flask
Микрофреймворк для создания веб приложений
pip install flask
Пример
from flask import Flask
app = Flask (__name__)
@ app .route ( '/' )
def index ():
return 'Hello World'
if __name__ == "__main__" :
app . run ()
Добавление пути
@app.route(‘/’)
def index (): return 'Hello World’
app . add_url_rule ( '/' , 'index' , index )
Задача
Реализовать homepage с небольшой информацией о себе