Как использовать несколько ядер процессора python
Перейти к содержимому

Как использовать несколько ядер процессора python

  • автор:

Процессы и потоки

Процесс — это программа, которая запущена в оперативной памяти компьютера. Другими словами, процесс — это набор инструкций, которые выполняются последовательно.

Одно из назначений процесса — изолировать программы одну от другой. Это реализуется путем того, что процессы не имеют доступа к адресным пространствам друг друга (процесс А не может залезть в память процесса Б и что-то там перезаписать).

Каждый процесс, запущенный в ОС имеет свои характеристики.

  • PID — идентификатор процесса
  • Объем оперативной памяти. Каждый процесс запрашивает у ОС некоторый объем оперативной памяти.
  • Стек. Он используется для вызова функций, для создания локальных переменных этих функций.
  • Список открытых файлов.
  • Стандартный ввод/вывод.

Несколько процессов на одном ядре CPU (вытесняющая многозадачность)

Чтобы посмотреть список запущенных процессов, существует команда top . Эта команда нам покажет, какие процессы запущены в данный момент. Если мы введем эту команду на машине с одним единственным ядром CPU, то увидим, что на ней «одновременно» выполняется достаточно большое количество процессов. Разберем, как одно ядро обрабатывает «одновременно» несколько процессов.

На самом деле процессы выполняются не одновременно, а последовательно: сначала работает один процесс, потом контекст переключается и работает второй процесс, потом контекст переключается и работает третий процесс и т.д. Эти переключения происходят очень быстро (так как тактовая частота современных процессоров достаточно высока) и нам кажется, что процессы работают одновременно.

Такой вид работы называется конкурентностью или вытесняющей многозадачностью, так как задачи разделяют доступ к одним и тем же ресурсам. В нашем примере процессы конкурируют за процессорное время ядра CPU. Этот доступ должен как-то регулироваться. Регулированием этого доступа занимается операционная система. Она выделяет каждой задаче небольшие интервалы времени каждой задаче и выполняет переключение между задачами.

Важно, что чем больше переключений происходит, тем больше времени тратится именно на переключение контекста. Из этого можно сделать важный вывод, что на одноядерной машине программа, спроектированная для последовательного выполнения (имеются в виду CPU bound задачи) будет работать быстрее, чем программа, спроектированная для параллельного выполнения.

На заре персональных компьютеров тактовые частоты измерялись десятками мегагерц, потом частоту увеличили до сотен мегагерц, а потом и до тысяч мегагерц (гигагерцы). Тактовая частота современных ядер достигает нескольких гигагерц.

Многоядерные процессоры

Дальнейшее увеличение тактовой частоты приводит к сильному удорожанию производства, а увеличивать производительность нужно. Поэтому инженеры придумали делать многоядерные процессоры.

В многоядерных процессорах каждое ядро в каждый момент времени может выполнять свой процесс. Таким образом, на многоядерных процессорах действительно возможно, что одновременно выполняется несколько процессов. Чем больше физическое количество ядер, тем больше одновременных задач наш компьютер может выполнять. Таким образом настоящие параллельные вычисления могут выполняться только на многоядерной машине.

Поток

Поток (thread) — это непосредственно поток выполнения инструкций программы. Это последовательность операций, выполнением которых на процессоре управляет операционная система.

У каждого процесса есть как минимум один поток (главный поток). В этом потоке исполняются инструкции, написанные в программе. При этом программа может создавать дополнительные потоки и распараллеливать вычисления на разных потоках выполнения.

Все потоки выполнения в рамках одного процесса разделяют ресурсы (например оперативную память). В этом есть преимущества и недостатки. К преимуществам можно отнести, что мы можем относительно несложной логикой поделить вычисления в несколько потоков, если разные потоки лягут на разные процессорные ядра, мы можем ускорить работу всей системы (в Python такой подход не работает из-за GIL). Основной проблемой разделения ресурсов является состояние гонки (race condition). Суть проблемы в том, что логика выполнения программы начинает зависеть от порядка выполнения кода в разных потоках. Например, один поток может читать какие-то данные, а второй поток в это же время изменять эти данные. Первый поток может не учитывать, что данные уже изменились вторым потоком. Такие проблемы сложно отслеживать и исправлять.

Еще одним недостатком является трудность в масштабировании. Например, в web-приложении нам могут поступать тысячи запросов одновременно. На обработку каждого запроса нам хотелось бы создавать отдельный поток, но количество ядер у нас ограничено (несколько штук или несколько десятков). Тысяча потоков на восьми ядрах будет работать очень медленно, а увеличивать количество ядер — это очень дорого и неэффективно.

Виды потоков

Потоки делятся на два вида:

  • foreground
  • background (daemon)

Как было сказано выше, у каждого процессе есть по крайней мере один поток (главный поток). Этот поток имеет тип foreground. До тех пор, пока жив хотя бы один foreground поток, программа будет продолжать работу, а процесс будет висеть в памяти.

Если в программе завершились все foreground потоки, то она не будет ждать завершения background потоков, они будут завершены принудительно.

В Python потоки представлены классом Thread пакета threading .

from threading import Thread

t = Thread(target)
t.start()

target — это функция, которая будет выполняться в отдельном потоке.

После вызова t.start() , функция target будет вызвана в новом потоке. Текущий поток продолжит выполнение. В результате эти два потока будут выполняться конкурентно.

Если мы в каком-то месте программы хоти дождаться пока созданный поток завершится, нужно вызвать метод join у объекта потока. join — это блокирующий вызов. Поток, в котором произошел вызов будет заблокирован до тех пор, пока не завершится нужный поток.

t.join() # ждем, пока завершиться запущенный поток

По умолчанию создается foreground поток. Чтобы создать background поток, нужно передать в конструктор класса Thread параметр daemon = True .

t = Thread(target, daemon=True)

Выбирая тип создаваемого потока (foreground или background), следует помнить, что программа не будет завершена, пока не завершаться все foreground потоки. Как правило, тип foreground следует задавать только тем потокам, которые по значимости конкурируют с основным потоком программы. В большинстве случаев foreground поток должен быть всего один.

Hyper-Threading

Вернемся к схеме с одним процессорным ядром. В каждый конкретный момент это ядро выполняет какой-то один процесс, а точнее поток (так как у одного процесса может быть несколько потоков). В каждом ядре процессора есть много блоков, и когда в кокой-то определенный момент времени ядро выполняет поток, часть этих блоков может простаивать. Инженеры придумали занимать блоки, которые простаивают во время выполнения одного потока, другими потоками. В компании Intel такая технология называется Hyper-Threading.

Надо понимать, что если поток выполняет какие-то сложные операции то свободных ресурсов остается немного или может не остаться совсем. В этом случае Hyper-Threading может быть бесполезен.

Операционная система видит логические ядра. Например, если у процессора в характеристиках стоит: 6 ядер, 12 потоков. Это означает, что физических ядер 6, а логических — 12. Операционная система будет видеть 12 ядер. Разумеется, логическое ядро менее эффективно, чем физическое ядро.

В итоге, технология Hyper-Threading позволяет увеличить производительность системы на несколько десятков процентов.

Приоритеты процессов и потоков

У процессов и потоков есть приоритеты. Эти приоритеты выставляются на уровне операционной системы. При этом управлять приоритетом процесса из Python можно, а приоритетом потока — нельзя (из-за GIL).

  • IDLE_PRIORITY_CLASS — самый низкий приоритет
  • BELOW_NORMAL_PRIORITY_CLASS
  • NORMAL_PRIORITY_CLASS
  • ABOVE_NORMAL_PRIORITY_CLASS
  • HIGH_PRIORITY_CLASS
  • REALTIME_PRIORITY_CLASS — самый высокий приоритет
  • THREAD_PRIORITY_IDLE — самый низкий приоритет
  • THREAD_PRIORITY_LOWEST
  • THREAD_PRIORITY_BELOW_NORMAL
  • THREAD_PRIORITY_NORMAL
  • THREAD_PRIORITY_ABOVE_NORMAL
  • THREAD_PRIORITY_HIGHEST
  • THREAD_PRIORITY_TIME_CRITICAL — самый высокий приоритет

GIL

Как было сказано выше, потоки позволяют эффективно задействовать ресурсы компьютера. Например, у нас есть 4х-ядерный процессор и можно написать программу, которая создаст несколько потоков (например 4), каждый поток ляжет на отдельное ядро процессора и все потоки будут выполняться одновременно (ну или имеют потенциальную возможность выполнятся одновременно).

В Python (в реализации CPython) эта схема работает не так, потому что в Python есть GIL.

GIL — Global Interpreter Lock.

GIL блокирует работу интерпретатора и одновременное выполнение нескольких потоков. Потоки всегда выполняются последовательно (вытесняющая многозадачность) и наличие нескольких ядер никак на это не влияет.

Из-за этого в Python невозможно эффективное использование при помощи потоков всех процессорных ядер. Наоборот, попытка «распараллелить» вычисления на несколько ядер в CPU-bound задаче приведет к замедлению выполнения программы.

GIL представлен булевой переменной, к которой поток должен получить доступ, прежде чем начать исполнение кода. Только один поток может иметь доступ к GIL в любой момент времени. Получается что в любой момент времени может работать только один поток. В процессе есть только один GIL и к нему все потоки стремятся получить доступ, чтобы исполнить код. В рамках процесса обойти этот механизм нельзя.

Зачем так было сделано? Дело в том, что GIL повышает производительность однопоточных приложений.

Пока никто не придумает модель без GIL без ущерба производительности однопоточных приложений — GIL будет жить. Гвидо Ван Россум.

Для реального распараллеливания вычислений придется порождать процессы (со своей собственной инстанцией интерпретатора). Разумеется, создание процесса имеет стоимость.

В последнее время проявились проекты по решению ограничения GIL — создание нескольких интерпретаторов в рамках одного процесса (это дешевле, чем создавать отдельные процессы).

Асинхронность (кооперативная многозадачность)

Мы выяснили, что многопоточные приложения плохо масштабируются. Однако в современных web-приложениях умение масштабироваться очень важно и поэтому был придуман асинхронный подход.

Асинхронность работает в рамках одного процесса и одного потока.

Одним предложением асинхронность можно охарактеризовать так: это возникновение событий, независимых от основного потока программы и способы обработки таких событий.

Покажем отличие асинхронного подхода от многопоточного на примере веб сервиса. У этого сервиса есть эндпоинт, который обрабатывает запросы клиентов по http. Этот эндпоинт отвечает примерно за 300 мс. Если проанализировать время выполнения эндпоинта, то выяснится, что много времени тратится на ожидание: ожидание ответа от базы данных, от сторонних API, чтение-запись данных на диск и т.п. Например, из 300 мс, мы ожидаем 200 мс.

Допустим, мы запустили наш сервис на 8-ми ядерном процессоре. При многопоточном подходе мы можем обрабатывать до восьми запросов одновременно. При увеличении количества запросов, начнет накапливаться очередь и чем больше клиентов, тем больше очередь. Чем больше очередь, тем больше время ожидания, вплоть до превышения времени ожидания ответа от сервера.

Получается, что в синхронной схеме на 8-ми ядерном процессоре наш сервис может обрабатывать порядка 10 одновременных запросов. А это совсем немного. Хочется сотни или тысячи запросов.

Разберемся, теперь как работает асинхронный подход. Мы помним, что из 300 мс, за которые отвечает сервер, 200 мс мы ждем. За это время ожидания мы можем обрабатывать запросы других клиентов. У нас есть только один процесс, в рамках которого работает один поток. Этот поток обрабатывает входящие http-запросы. Когда в рамках текущего http-запроса нужно отправить запрос в базу данных, запрос отправляется и вместо того, чтобы заблокировать выполнение потока и просто ожидать ответа от БД, наш поток обрабатывает следующий http-запрос. В рамках второго http-запроса также отправляется запрос в БД, и поток начинает обрабатывать третий http-запрос и так далее. Получается, что один поток может обрабатывать огромное количество клиентов. При это ничего не мешает запустить несколько процессов нашей программы.

Виды процессинга

Существуют два вида процессинга:

  • CPU-bound — операции, которые требуют много процессорного времени
  • I/O-bound — операции, которые «привязаны» к «медленным» устройствам (диск, сеть и т.п.)

Для CPU-bound задач используется вытесняющая многозадачность. Для I/O-bound задач используется кооперативная многозадачность.

CPU-bound задачи могут требовать параллелизацию и тогда нужно использовать многоядерность. Если параллелизации не требуется и достаточно одного ядра с time-slicing (concurrency).

В Python для каждого вида процессинга предусмотрен подход:

Вид процессинга Как осуществляется Параллелизация Модуль Python
CPU-bound (concurrency) Переключение между потоками через time-slicing Нет / 1 ядро threading
CPU-bound (parallelization) Одновременное исполнение N-процессов (задач) Да / N ядер multiprocessing
I/O-bound Кооперация Нет / 1 ядро asyncio

Многопоточность в Python. Библиотеки threading и multiprocessing.

Процесс — исполняемый экземпляр какой-либо программы. Каждый процесс состоит из следующих элементов:

  • образ машинного кода;
  • область памяти, в которую включается исполняемый код, данные процесса (входные и выходные данные), стек вызовов и куча (для хранения динамически создаваемых данных);
  • дескрипторы операционной системы (например, файловые дескрипторы);
  • состояние процесса.

В целях стабильности и безопасности, в современных операционных системы каждый процесс имеет прямой доступ только с своим собственным ресурсам. Доступ к ресурсам другого процесса возможен через межпроцессное взаимодействие (например, посредством файлов, при помощи именованных и неименованных каналов и другие).

Сам процесс может быть разделен на так называемые потоки. Поток (поток выполнения, thread) — наименьшая единица обработки, исполнение которой может быть назначено ядром операционной системы. В отличии от нескольких процессов, потоки существуют внутри одного процесса и имеют доступ к ресурсам этого процесса. Каждый поток обладет собственным набором регистров и собственным стеком вызова, но доступ к ним имеют и другие потоки.

Sharing data between threads

При работе с потоками стоит учесть несколько моментов:

  • одно ядро процессора в один момент может исполнять только один поток;
  • потоки одного процесса могут исполняться физически одновременно (на разных ядрах);
  • бессмысленно порождать потоков больше, чем у вас есть ядер.

Потоки имеют несколько применений. Первое — ускорение работы программы. Ускорение достигается за счет параллельного выполнения независимых друг от друга вычислений. Например, при численном интегрировании область интегрирования может быть разбита на 3 участка. На каждый участок создается свой поток, в котором численно вычислется интеграл для конкретного участка. Второе — независимое исполнение операций. Отличие этого случая от первого хорошо видно на следующем примере. Пусть есть приложение с графическим интерфейсом, где весь код выполняется в одном потоке. При выполнении какой-нибудь долгой операции (например, копирование файла) интерфейс приложения просто перестанет отвечать до тех пор, пока долгий процесс не завершится. В таком случае в один поток помещается работа графического интерфейса, в другой — остальные вычисления. В таком случае интерфейс позволит проводить другие операции даже во время выполнения долгой операции в другом потоке (например, заполнение прогресс бара в процессе копирования файла).

threading

В Python работа с потоками осуществляется при помощи стандартной библиотеки threading. В библиотеке представлен класс Thread для создания потока выполнения. Задание исполняемого кода в отдельном потоке возможно двумя способами:

  • передача исполняемого объекта (функции) в конструктор класса;
  • переопределение функции run() в классе-наследнике.

После того, как объект создан, поток запускается путем вызова метода start(). Рассмотрим простой пример:

import threading import sys def thread_job(number): print('Hello <>'.format(number)) sys.stdout.flush() def run_threads(count): threads = [ threading.Thread(target=thread_job, args=(i,)) for i in range(0, count) ] for thread in threads: thread.start() # каждый поток должен быть запущен for thread in threads: thread.join() # дожидаемся исполнения всех потоков run_threads(4) print(finish) 

Конструктор класса Thread имеет следующие аргументы:

  • group должно быть None; зарезервировано для будующих реализаций Python 3;
  • target является исполняемым объектом (по умолчанию равен None, ничего не исполняется);
  • name обозначет имя потока (по умолчанию имя генерируется автоматически);
  • args — кортеж аргументов для исполняемого объекта;
  • kwargs — словарь именованных аргументов для исполняемого объекта;
  • daemon равное True обозначет служебный поток (служебные потоки завершаются принудительно при завершении процесса); по умолчанию False.

В Python выполнение программы заканчивается, когда все неслужебные потоки завершены. Модифицировав программу выше, мы все еще получим корректно работающий код:

import threading import sys import time def thread_job(number): time.sleep(2) # "усыпляем" поток на 2 сек print('Hello <>'.format(number)) sys.stdout.flush() def run_threads(count): threads = [ threading.Thread(target=thread_job, args=(i,)) for i in range(1, count) ] for thread in threads: thread.start() # каждый поток должен быть запущен run_threads(1) print(finish) 

Как можно увидеть, программа завершается без ошибок (с кодом 0), но теперь строка «finish» печатается раньше строки «Hello 0», т.к. главный поток теперь не ждет завершения работы других потоков. Метод join() используется для блокирования исполнения родительского потока до тех пор, пока созданный поток не завершится. Это нужно в случаях, когда для работы потока-родителя необходим результат работы потока-потомка. Вспомним пример с численным интегрированием. Вычисление итогового значения интеграла выполняется в главном потоке, но это возможно только после завершения вычислений в побочных потоках. В таком случае главный поток нужно просто приостановить до тех пор, пока не завершатся все побочные потоки. Метод join() может принимать один аргумент — таймаут в секундах. Если таймаут задан, join() бликирует работу на указанное время. Если по истечении времени ожидаемый поток не будет завершен, join() все равно разблокирует работу потока, вызвашего его. Проверить, исполняется ли поток можно методом is_alive(). Подробнее ознакомиться с функционалом библиотеки можно в официальной документации по threading.

Упражнение №1

Запустите следующий код. В чем проблема данного кода? Всегда ли counter = 10 после исполнения кода программы?

import threading import sys def thread_job(): global counter old_counter = counter counter = old_counter + 1 print('<> '.format(counter), end='') sys.stdout.flush() counter = 0 threads = [threading.Thread(target=thread_job) for _ in range(10)] for thread in threads: thread.start() for thread in threads: thread.join() print(counter) 

Демонстрация «проблемности» кода:

import threading import random import time import sys def thread_job(): global counter old_counter = counter time.sleep(random.randint(0, 1)) counter = old_counter + 1 print('<> '.format(counter), end='') sys.stdout.flush() counter = 0 threads = [threading.Thread(target=thread_job) for _ in range(10)] for thread in threads: thread.start() for thread in threads: thread.join() print(counter) 

Почему так происходит? Есть несколько возможных решений этой проблемы.

import threading import random import time import sys def thread_job(): lock.acquire() # mutex global counter old_counter = counter time.sleep(random.randint(0, 1)) counter = old_counter + 1 print('<> '.format(counter), end='') sys.stdout.flush() lock.release() lock = threading.Lock() counter = 0 threads = [threading.Thread(target=thread_job) for _ in range(10)] for thread in threads: thread.start() for thread in threads: thread.join() print(counter) 
import threading import random import time import sys def thread_job(): with lock: global counter old_counter = counter time.sleep(random.randint(0, 1)) counter = old_counter + 1 print('<> '.format(counter), end='') sys.stdout.flush() lock = threading.Lock() counter = 0 threads = [threading.Thread(target=thread_job) for _ in range(10)] for thread in threads: thread.start() for thread in threads: thread.join() print(counter) 

Вариант с контекстным менеджером более предпочтителен. Вспомните работу с файлами при помощи with. По завершении with файл автоматически закрывался. В данном случае похожая ситуация. Для того, чтобы запретить нескольким потокам параллельно выполнять некоторые участки кода, мы используем Lock (в UNIX системах более известен как мьютекс (mutex)). Мьютекс может быть в двух состояниях: свободен и заблокирован. Если какой-либо поток пытается заблокировать уже заблокированный мьютекс, то поток блокируется до тех пор, пока мьютекс не освободится. Причем если несколько потоков претендует на блокирование мьютекса, то потоки просто выстраиваются в очередь. Главная проблема — не освобожденный мьютекс. Отсутствие строчки lock.release() может повесить остальные потоки в бесконечное ожидание. Контекстный менеджер позволит избежать этой проблемы. Как только он закончится, все захваченные им ресурсы будут освобождены, в том числе мьютекс.

Упражнение №2

Иногда бывает нужно узнать доступность набора ip адресов. Неэффективный вариант представлен ниже.

Реализуйте то же самое, но используя threading.

import os, re received_packages = re.compile(r"(\d) received") status = ("no response", "alive but losses", "alive") for suffix in range(20, 30): ip = "192.168.178." + str(suffix) ping_out = os.popen("ping -q -c2 " + ip, "r") # получение вердикта print(". pinging ", ip) while True: line = ping_out.readline() if not line: break n_received = received_packages.findall(line) if n_received: print(ip + ": " + status[int(n_received[0])]) 

Global Interpreter Lock (GIL)

CPython — популярная реализация интерпретатора — имеет встроенный механизм, который обеспечивает выполнение ровно одного потока в любой момент времени. GIL облегчает реализацию интерпретатора, защищая объекты от одновременного доступа из нескольких потоков. По этой причине, создание несколько потоков не приведет к их одновременному исполнению на разных ядрах процессора.

GIL visualisation

Однако, некоторые модули, как стандартные, так и сторонние, созданы для освобождения GIL при выполнении тяжелых вычислительных операций (например, сжатие или хеширование). К тому же, GIL всегда свободен при выполнении операций ввода-вывода.

Упражнение №3

Написать программу, которая будет находить сумму чисел массива с использованием N потоков. Запустить с разным параметром N. Убедиться, что несмотря на увеличение N, ускорения подсчета не происходит. Причина этому — GIL. В Python вычисления распараллеливать бессмысленно. Замерить время работы можно с помощью библиотеки time (ответ в секундах):

start = time.time() # код, время работы которого надо замерить print(time.time() - start) 

Упражнение №4

Запустите на исполнение, замерив время работы. Перепишите с помощью потоков и опять замерьте время.

import urllib.request import time urls = [ 'https://www.yandex.ru', 'https://www.google.com', 'https://habrahabr.ru', 'https://www.python.org', 'https://isocpp.org', ] def read_url(url): with urllib.request.urlopen(url) as u: return u.read() start = time.time() for url in urls: read_url(url) print(time.time() - start) 

Потоки очень уместны, если в коде есть блокирующие операции (ввод-вывод, сетевые взаимодействия). Также, удобно разбивать логические процессы по потокам (анимация, графический интерфейс, и тд).

multiprocessing

Библиотека multiprocessing позволяет организовать параллелизм вычислений за счет создания подпроцессов. Т.к. каждый процесс выполняется независимо от других, этот метод параллелизма позволяет избежать проблем с GIL. Предоставляемый библиотекой API схож с тем, что есть в threading, хотя есть уникальные вещи. Создание процесса происходит поутем создания объекта класса Process. Аргументы конструктора аналогичны тем, что есть в конструкторе Thread. В том числе аргумент daemon позволяет создавать служебные процессы. Служебные процессы завершаются вместе с родительским процессом и не могут порождать свои подпроцессы.

Простой пример работы с библиотекой:

from multiprocessing import Process def f(name): print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join() 

Чтобы убедить, что каждый процесс имеет свой ID, запустите пример:

from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main line') p = Process(target=f, args=('bob',)) p.start() p.join() 

Старайтесь не забывать про конструкцию __name__ == ‘__main__’ . Это надо для того, чтобы ваш модуль можно было безопасно подключать в другие модули и при этом не создавались новые процессы без вашего ведома.

Упражнение №5

Запустите код. Попробуйте объяснить, почему LIST — пуст.

import multiprocessing def worker(): LIST.append('item') LIST = [] if __name__ == "__main__": processes = [ multiprocessing.Process(target=worker) for _ in range(5) ] for p in processes: p.start() for p in processes: p.join() print(LIST) 

Общение между процессами

multiprocessing предоставляет два вида межпроцессного обмена данными: очереди и каналы данных (pipe).

Очереди (класс Queue) аналогичны структуре данных «очередь», рассмотренной вами в курсе алгоритмов.

from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # выводит "[42, None, 'hello']" p.join() 

Класс Pipe отвечает за канал обмена данными (по умолчанию, двунаправленный), представленный двумя концами, объектами класса Connection. С одним концом канала работает родительский процесс, а с другим концом — подпроцесс.

from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # выводит "[42, None, 'hello']" p.join() 

Еще один вид обмена данными может быть достигнут путем записи/чтения обычных файлов. Чтобы исключить одновременную работу двух процессов с одним файлом, в библиотеке есть классы аналогичные threading.

from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start() 

Подробнее ознакомиться с функционалом библиотеки можно в официальной документации по multiprocessing.

Класс Pool в multiprocessing

Класс Pool — удобный механизм распараллеливания выполнения функций, распределения входных данных по процессам и т.д.

Наиболее интересные функции: Pool.apply, Pool.map, Pool.apply_async, Pool.map_async.

apply, map работают аналогично питоновским built-in apply, map.

Как работает Pool можно понять на примере:

from multiprocessing import Pool def cube(x): return x**3 if __name__ == "__main__": pool = Pool(processes=4) # создаем пул из 4 процессов # в apply можно передать несколько аргументов results = [pool.apply(cube, args=(x,)) for x in range(1,7)] # раскидываем числа от 1 до 7 по 4 процессам print(results) pool = Pool(processes=4) # то же самое, но с map. разбивает итерируемый объект (range(1,7)) на chunks и раскидывает аргументы по процессам results = pool.map(cube, range(1,7)) print(results) 

map, apply — блокирующие вызовы. Главная программа будет заблокирована, пока процесс не выполнит работу.

map_async, apply_async — неблокирующие. При их вызове, они сразу возвращают управление в главную программу (возвращают ApplyResult как результат). Метод get() объекта ApplyResult блокирует основной поток, пока функция не будет выполнена.

pool = mp.Pool(processes=4) results = [pool.apply_async(cube, args=(x,)) for x in range(1,7)] output = [p.get() for p in results] print(output) 
Упражение №6*

Для этого упражнения скачайте архив viterbi_mp.zip с кодом и необходимыми данными.

Рассмотрим следующую задачу. Положение мобильного робота на двумерной карте может быть представлено тремя числами: x, y и направлением θ. Точное положение робота нам не известно. В связи с этим мы строим N гипотез о его пложении, сумма их вероятностей равна 1. В процессе движения робота некоторые гипотезы исчезали, а некоторые порождали новые. Однако в каждый момент времени количество гипотез — константа. Известно, какая гипотеза из какой была порождена.

Представленный (и слегка упрощенный) выше метод оценки положения робота множеством гипотез называется фильтром частиц, а сами гипотезы называются частицами. Фильтр частиц используется для оценки положения робота в процессе его движения. Вспомним, что в процессе работы некоторые частицы погибают, а некоторые порождают другие. Переходы между частицами образуют граф перехода. Используя этот граф, можно оценить траекторию робота с некоторой точностью.

Задача: необходимо восстановить траекторию движения робота. Есть несколько способов приближенно решить данную задачу. Один из способов — восстановить наиболее вероятную траекторию. Для этого воспользуемся алгоритмом Витерби, одним из алгоритмов динамического программирования.

Пусть у нас было T моментов времени. На каждом моменте времени t мы для каждой частицы, существующей в момент времени t, выбираем наиболее вероятный переход из какой-нибудь частицы с момента времени t-1. Тогда ответом будет — argmax по вероятности среди всех частиц в последний момент времени. Однако, сам алгоритм довольно медленный. Его асимптотика O(T * N^2) .

В архиве вам предоставлен код в файле generate_viterbi_trajectory.py . Однако, он написан без распараллеливания. Ваша задача — распараллелить код, используя multiprocessing. Файл graph.ldj представляет собой текстовый файл, где каждая строка в формате JSON. Каждая строка представляет собой один момент времени. В этом задании вам предлагаются первые 10 моментов времени движения робота. В каждый момент времени количество частиц N = 2000 . Файл localization_config.json — файл конфигурации, содержащий параметры с которыми происходила генерация графа. Файл true_trajectory.json содержит массив троек чисел (x, y, θ), построенный нераспараллеленым алгоритмом. Вам надо будет сравнить полученную вами траекторию с данной при помощи скрипта correspond_trajectories.py . Для тех, кто хочет попробовать свой код на больших данных, используйте файл full_graph.ldj , который содержит порядка 1700 строк. Архив с файлом.

Не забудьте замерить время работы. Примерное время работы на моем компьютере для 10 строк в 1 процесс — 300 сек.

Сайт построен с использованием Pelican. За основу оформления взята тема от Smashing Magazine. Исходные тексты программ, приведённые на этом сайте, распространяются под лицензией GPLv3, все остальные материалы сайта распространяются под лицензией CC-BY.

Как использовать все процессоры при записи данных в файл используя python?

У меня есть функция код, который по определенной логике записывает данные в Базу данных. Я считываю данные с 2-х файлов построчно и на основании этих данных создаю запись в Базе Данных. Как БД использую ESRI geodatabase. Но, проблема с том, что для работы мы используем 1 ядро и тратим очень много времени на выполнения этого кода. У меня же 96 ядер и я хочу сэкономить время при использовании всех ядер. Это можно сделать через распаралеливание процессов. Вот мой код: Создаю БД и таблицу в ней:

 arcpy.CreateFileGDB_management("C:\Users\ivan\CellRebell\ESRI_New_Zeland", "%s.gdb"%Island) result = arcpy.management.CreateFeatureclass( "C:\Users\ivan\CellRebell\ESRI_New_Zeland/%s.gdb"%Island, "esri_square", "POLYGON", spatial_reference=4326) feature_class = result[0] arcpy.AddField_management(feature_class, 'ID', 'TEXT') 

Записываю данные в таблицу:

with arcpy.da.InsertCursor(feature_class, ['ID','SHAPE@']) as cursor: with open('%s_long_double.txt'%Island, 'r') as long: for i in long: i_1, i_2 = i.split() with open('%s_short_double.txt'%Island, 'r') as short: for k in short: k_1, k_2 = k.split() coord = [[float(i_1),float(k_1)],[float(i_2),float(k_1)],[float(i_2),float(k_2)],[float(i_1),float(k_2)]] count+=1 row = ['%s_%s'%(Island, count), coord] cursor.insertRow(row) del cursor 

Как использовать все ядра для записи данных в файл? Скрипт написан на Python 2.7. Основная задача, которую я хочу решить — это ускорить процесс записи данных в таблицу через использования всех процессоров. Спасибо

Как работать с многоядерными процессорами в Python?

Собственно правильно ли я понимаю, что это вообще невозможно, так как из-за GIL в Python все потоки все равно будут делаться последовательно одним процессором? Актуально ли это для Python 3.4? Как обойти (писать код на C)?

  • Вопрос задан более трёх лет назад
  • 13728 просмотров

Комментировать
Решения вопроса 1

https://docs.python.org/2/library/multiprocessing.html — единственный известный мне способ утилизровать несколько ядер на питоне. GIL есть в обоих ветках питона, обойти никак (вроде, если я не ошибаюсь, написав экстеншн на С, Вы все равно будете вынуждены запускать его в тех же условиях).

Ответ написан более трёх лет назад
Нравится 6 2 комментария
iegor @iegor Автор вопроса

Т.е. для использования нескольких процессоров необходимо ветвить процессы, а не потоки? Модули subprocess и threading не подходят?

iegor: subprocess тоже позволит использовать несколько ядер процессора, но он используется для исполнения исполняемых файлов + имеет блокирующее апи (мультипроцессинг же позволяет выполнить определенную функцию, например, что куда удобней именно для целей распараллеливания). Т. е. если Вы в своем процессе вызываете сабпроцесс, то текущий процесс будет заблокирован ожиданием ответа, но в данном случае вам как раз поможет threading или какой-нибуль gevent. Вообще это довольно сложная тема. Вкратце попытаюсь объяснить: GIL не настолько портит жизнь как Вам кажется. К примеру, у вас есть задача спарсить 100 сайтов. Вы в главном потоке создаете через threading 4 потока и в каждый из них отправляете по 25 сайтов на парсинг. Управление получает первый поток — он вызывает какой-нибудь requests.get, который является блокирующей операцией. Соответственно текущий поток начинает ждать ответа. GIL в этот момент освобождает лок и другой поток начинает свое выполнение. И так повторяется для каждого потока. Получается пока происходит блокирующая операция в одном потоке, второй может работать. Естественно все это происходит только на одном ядре и не является полноценным мултитридингом, но если писать аккуратно и вдумчиво, то на блокирующих операциях даже при GIL с threading можно получить прирост производительности. Ну а вообще примите как факт, что питон не умеет утилизировать многоядерные процессы и жизнь станет проще 🙂

Ответы на вопрос 6

Sly_tom_cat

Sly_tom_cat . @Sly_tom_cat

from time import time
from threading import Thread
from multiprocessing import Process

def count(n):
while n > 0:
n -= 1

startTime = time()
count(100000000)
count(100000000)
print(‘\nSequential execution time : %3.2f s.’%(time() — startTime))

startTime = time()
t1 = Thread(target=count, args=(100000000,))
t2 = Thread(target=count, args=(100000000,))
t1.start(); t2.start()
t1.join(); t2.join()
print(‘\nThreaded execution time : %3.2f s.’%(time() — startTime))

startTime = time()
p1 = Process(target=count, args=(100000000,))
p2 = Process(target=count, args=(100000000,))
p1.start(); p2.start()
p1.join(); p2.join()
print(‘\nMultiprocessed execution time : %3.2f s.’%(time() — startTime))

Дает на 4-х ядерном проце:

Sequential execution time : 6.83 s.

Threaded execution time : 11.37 s.

Multiprocessed execution time : 6.30 s.

Но допустим распараллеливание запросов к http серверу и в thread варианте даст огромный выигрыш.
Т.е. без учета специфики задачи — в многопоточность/многопроцессорность — лучше просто не соваться.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *