Herkese merhaba,
Bu yazıda, her birinin gerçekleşmesi vakit alan çok sayıda işlemi aynı anda nasıl çalıştırabileceğimizle alakalı bir örnek yapacağız. Bu tür çoklu işlemleri aynı anda yapabilmemizi sağlayan üç yaklaşım çeşidinden bahsedeğim. Bu yöntemlerin ilki, threading
kütüphanesinin kullanımına dayanır ve yaygın olarak kullanılıyor. Yöntemlerden ikincisi, asyncio
kütüphanesinin kullanımına dayanıyor ve sanıyorum threading
’e göre daha az yaygın. Son yöntem ise, fonksiyonların bir generator
↔ coroutine
olarak kullanılmasına dayanıyor.
Bu üç yaklaşımın her biri için çalışan bir çoklu ilerleme çubuğu
sınıfı tanımlayalım.
Basitten karmaşığa doğru ilerleyelim. Öncelikle tek bir işlem için çalışan basit bir ilerleme çubuğu tanımlayacağım. Daha sonra da birden çok işlem için çalışabilen bir ilerleme çubuğu nesnesi oluşturmaya geçeceğiz.
Öncelikle tasarlayacağım nesnenin nasıl görüneceğine bir bakalım.
|▮▮▮▮▮▮▮▮▮▮▮▮▮ | - Completed: 52 % - Speed: 12065 - Elapsed: 4 - ETA: 4
Tek bir işlem için yukarıdaki gibi bir ilerleme çubuğu görmemiz lazım.
Peki, yukarıdaki ilerleme çubuğunda ne türden veriler yer alıyor bir göz atalım:
- Çubuk uzunluğu: Ön-tanımlı olarak
25
birim olarak seçildi.
- Tamamlanma yüzdesi:
length = int(25 * islem_sirasi / len(toplam_islem))
- An ve an çubukta yer alacak
▮
işareti sayısı): filled = "▮" * length
- An ve an çubukta yer alacak boşluk sayısı:
unfilled = " " * (25 - length)
- Geçen zaman:
elapsed = int(simdiki_zaman - islem_baslama_zamani)
- Hız:
speed = int(islem_sirasi / (simdiki_zaman - islem_baslama_zamani))
- Kalan zaman:
eta = int(len(toplam_islem) * (simdiki_zaman - islem_baslama_zamani) / islem_sirasi) - int(simdiki_zaman - islem_baslama_zamani)
Yukarıdaki kavramları bir araya getirerek kaydırma çubuğunun temellerini oluşturmaya başlayalım. Dediğim gibi önce basit bir ilerleme çubuğu oluşturalım. Sonra bu ilerleme çubuğunu gerçek işlemlerin ilerleme aşamalarını görüntüleyebilecek bir hale getirelim.
import time
def progress(iterable, index, start_time):
now = time.perf_counter()
length = int(25 * index / len(iterable))
filled = "\u25ae" * length
unfilled = (25 - length) * " "
completed = length * 4
elapsed = int(now - start_time)
speed = int(index / (now - start_time))
try:
eta = int(len(iterable) * (now - start_time) / index) - int(now - start_time)
except ZeroDivisionError:
eta = 0
return f"|{filled}{unfilled}| - Completed: {completed} % - Speed: {speed} - Elapsed: {elapsed} - ETA: {eta}"
def main():
iterable = range(100000)
start_time = time.perf_counter()
for i in iterable:
print("\r" + progress(iterable, i + 1, start_time), end="")
print()
main()
Yukarıda yer alan kodları çalıştırırsanız, ilerleme çubuğunun aynı satırda değişime uğradığını göreceksiniz. ilerleme çubuğu
ndaki değişimi sağlamak için yazdıracağımız stringin başına \r
karakter dizisini getirdik ve print
fonksiyonunun end
parametresine boş bir string yazdık.
Bu yaklaşım eğer karakter dizisi tek bir satırdan oluşuyorsa geçerli bir yaklaşımdır. Ancak birden çok satırı olan karakter dizileri için \r
karakter dizisi ekran yenileme işlemini yapamayacaktır. Çok satırlı karakter dizilerini güncellemek için başka karakter dizileri kullanmamız lazım. Bunlardan birisi, ekrandaki karakter dizilerini silmek için kullanacağımız bir ansi
karakter dizisidir. Diğeri de imleci ekranın üstüne tekrar taşımak için kullanılacak bir ansi
karakter dizisidir.
clear = "\033[2J"
start = "\x1B[1A"
Bu iki karakter dizisini nasıl kullanacağız peki?
start = "\x1B[1A"
karakter dizisine göz atacak olursak, bu karakter dizisinde sondan ikinci sırada yer alan 1
karakteri, 0
satır anlamına gelmektedir. Yani eğer sadece bir satırlık bir karakter dizisini ekrana yazdıracaksak start = "\x1B[1A"
ifadesini start = "\x1B[2A"
şeklinde değiştirmemiz gerekir. O halde karakter dizimiz içindeki her bir satır için 1A
ifadesindeki 1
, 1
birim artırılmalıdır.
Ekrana yazdıracağımız metnin başına start
’ı ekleyeceğiz. Sonra da her bir satır için temizleme işlemini yapıp, ilgili satırı yeniden yazdıracağız.
Şimdi tek bir işlem için üstüne yazma işlemini yeniden tasarlayalım. Yukarıdaki kodlardaki main
fonksiyonuna clear
ve start
değişkenlerini ekliyorum ve print
ifadesini print(clear + start + progress(iterable, i + 1, start_time))
olacak şekilde değiştiriyorum.
import os
import time
if os.name == "nt":
os.system("")
def progress(iterable, index, start_time):
now = time.perf_counter()
length = int(25 * index / len(iterable))
filled = "\u25ae" * length
unfilled = (25 - length) * " "
completed = length * 4
elapsed = int(now - start_time)
speed = int(index / (now - start_time))
try:
eta = int(len(iterable) * (now - start_time) / index) - int(now - start_time)
except ZeroDivisionError:
eta = 0
return f"|{filled}{unfilled}| - Completed: {completed} % - Speed: {speed} - Elapsed: {elapsed} - ETA: {eta}"
def main():
iterable = range(100000)
start_time = time.perf_counter()
clear = "\033[2J"
start = "\x1B[2A"
for i in iterable:
print(clear + start + progress(iterable, i + 1, start_time))
main()
Bu kodları çalıştırırsanız ekran tazeleme işleminin gerçekleştiğini görürsünüz. os.system("")
ifadesi, Windows konsolunda ANSI kaçış dizilerinin kullanılabilmesini sağlar.
Peki iki tane işlemin ilerlemesini nasıl gösterirdik?
Önce, start
’ın değerini “\x1B[3A” yapıyorum. Çünkü iki satır kullanacağım. Sonra da print
ifadesinin içini şöyle değiştiriyorum:
print(clear + start + progress(iterable, i + 1, start_time) + "\n" + progress(iterable, i + 1, start_time))
O halde main
fonksiyonumuz şöyle değişiyor:
def main():
iterable = range(100000)
start_time = time.perf_counter()
clear = "\033[2J"
start = "\x1B[3A"
for i in iterable:
print(clear + start + progress(iterable, i + 1, start_time) + "\n" + progress(iterable, i + 1, start_time))
Buraya kadar olan kısım anlaşıldıysa artık ilerleme çubuğumuzu daha ayrıntılı bir şekilde tasarlamaya geçebiliriz.
Bir sonraki kısımda ilerleme çubuğunu gerçek zamanlı işlemleri ölçebilecek bir hale getirelim. Yukarıda tek bir for
döngüsü içinde iki tane progress
’i aynı anda nasıl çağırabileceğimizi gördük. Ama ilerleme çubuklarının işlemler üzerinde etkili olması için başka bir algoritmaya ihtiyacımız var. Şimdi gelin yavaş yavaş kod tasarım şeklimizi değiştirelim. Ve sınıf kullanmaya başlayalım.
Öncelikle son paylaştığım koddaki print
ifadesinde yer alan progress
’leri tek tek yazmak yerine onları bizim seçimlerimize göre değişebilir bir hale getirelim.
def bar(progress, start, clear):
return clear + start + f"\n".join([prog["progress"] for prog in progress])
bar
fonksiyonu sayesinde, her bir ilerleme çubuğunu hazır hale getirelim.
Şimdi elimizde progress
ve bar
isimli iki tane fonksiyon var. Bu fonksiyonları bir sınıfın içine yerleştirelim ve fonksiyonları sınıf yapısına uygun hale getirelim.
import os
import time
import asyncio
import threading
if os.name == "nt":
os.system("")
class PBar:
def __init__(self):
self._progress = []
self._start = "\x1B[1A"
self._clear = "\033[2J"
self._t = 0
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
def __bar__(self, event, index, size):
self._progress[event]["progress"] = self.__progress__(size=size, index=index)
print(self._clear + self._start + "\n".join([i["progress"] for i in self._progress]))
def __progress__(self, size: int, index: int):
now = time.perf_counter()
length = int(25 * index / size)
filled = "\u25ae" * length
unfilled = (25 - length) * " "
completed = length * 4
elapsed = int(now - self._t)
speed = int(index / (now - self._t))
try:
eta = int(size * (now - self._t) / index) - int(now - self._t)
except ZeroDivisionError:
eta = 0
return f"|{filled}{unfilled}| - Completed: {completed} % - Speed: {speed} - Elapsed: {elapsed} - ETA: {eta}"
Yukarıdaki kodu izninizle açıklamaya çalışayım:
- Daha önce yazdığım
bar
ve progress
fonksiyonlarını PBar
isimli bir sınıfın içine taşıdık.
- İlerleme çubuklarının bilgilerini tutacak
self._progress
isimli bir örnek niteliği oluşturduk.
self._progress
isimli örneğin, self.__bar__
içinde değişecek şekilde düzenledik.
self._t
isimli start_time
’ın yerine bir örnek niteliği oluşturduk.
start
ve clear
değişkenlerini sınıf içine aldık.
Şimdi, her bir işlemi eklemek için bir örnek fonksiyonu tanımlıyorum. Bu fonksiyonun üç parametresi olacak. Çalıştırmak istediğimiz fonksiyon için target
parametresi, veri boyutu için size
parametresi ve diğer argümanlar için args
adlı bir parametre.
[...]
def add_progress(self, target, size, args):
event = len(self._progress)
progress = {
"progress": self.__progress__(size=size, index=0),
"target": lambda args=args, event=event, size=size: target(bar=lambda index: self.__bar__(event=event, index=index, size=size), **args)
}
self._progress += [progress]
self._start = self._start[:-2] + str(int(self._start[-2]) + 2) + self._start[-1]
[...]
Her bir işlem, progress
adlı bir sözlük verisi ile temsil edilecek. Her bir işlemin ilerleme çubuğu progress
adlı anahtar ile, her bir işlemin hedef fonksiyonu ise target
adlı anahtar ile temsil edilecek.
target
isimli hedef fonksiyonun bar
isimli bir parametresi olduğunu görüyorsunuz. Bu parametreye lambda
tipinde, parametresi index
olan bir fonksiyon atanmış. Bu lambda
fonksiyonu da self.__bar__(event, index, size)
fonksiyonunu çağırıyor. Yani bizim hedef fonksiyonda bar
olarak kullanacağımız fonksiyonu burada tanımlıyoruz. add_progress
ile ilerleme çubuğu atama işlemi yaptığımız esnada, self.__bar__
fonksiyonunun size
ve event
parametrelerinin argümanlarını belirlemiş oluyoruz. index
parametresine ise hedef
fonksiyonda argüman vereceğiz.
Her işlem eklediğimizde self._start
’ın sondan ikinci karakterini bir birim artıracağız.
Şimdi bütün ilerleme çubuklarını çalıştıracak mekanizmayı her üç yaklaşıma göre de oluşturalım.
generator <-> coroutine için yürütücü
[...]
def run_generator(self):
index = 0
self._t = time.perf_counter()
coroutines = {}
while not all("100 %" in i["progress"] for i in self._progress):
try:
if index not in coroutines:
coroutines[index] = self._progress[index]["target"]()
coroutines[index].send(None)
except StopIteration:
pass
index += 1
if index == len(self._progress):
index = 0
[...]
asyncio için yürütücü
[...]
async def __run_async__(self):
self._t = time.perf_counter()
await asyncio.gather(*[self._loop.create_task(progress["target"]()) for progress in self._progress])
self._loop.stop()
def run_async(self):
self._loop.run_until_complete(self.__run_async__())
[...]
threading için yürütücü
[...]
def run_thread(self):
threads = []
self._t = time.perf_counter()
for i in self._progress:
thread = threading.Thread(target=i["target"], daemon=True)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
[...]
Kodların hepsini tek bir seferde bir daha yazayım:
import os
import time
import asyncio
import threading
if os.name == "nt":
os.system("")
class PBar:
def __init__(self):
self._progress = []
self._start = "\x1B[1A"
self._clear = "\033[2J"
self._t = 0
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
def __bar__(self, event, index, size):
self._progress[event]["progress"] = self.__progress__(size=size, index=index)
print(self._clear + self._start + "\n".join([i["progress"] for i in self._progress]))
def __progress__(self, size: int, index: int):
now = time.perf_counter()
length = int(25 * index / size)
filled = "\u25ae" * length
unfilled = (25 - length) * " "
completed = length * 4
elapsed = int(now - self._t)
speed = int(index / (now - self._t))
try:
eta = int(size * (now - self._t) / index) - int(now - self._t)
except ZeroDivisionError:
eta = 0
return f"|{filled}{unfilled}| - Completed: {completed} % - Speed: {speed} - Elapsed: {elapsed} - ETA: {eta}"
def add_progress(self, target, size, args):
event = len(self._progress)
progress = {
"progress": self.__progress__(size=size, index=0),
"target": lambda args=args, event=event, size=size: target(bar=lambda index: self.__bar__(event=event, index=index, size=size), **args)
}
self._progress += [progress]
self._start = self._start[:-2] + str(int(self._start[-2]) + 2) + self._start[-1]
def run_thread(self):
threads = []
self._t = time.perf_counter()
for i in self._progress:
thread = threading.Thread(target=i["target"], daemon=True)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
async def __run_async__(self):
self._t = time.perf_counter()
await asyncio.gather(*[self._loop.create_task(progress["target"]()) for progress in self._progress])
self._loop.stop()
def run_async(self):
self._loop.run_until_complete(self.__run_async__())
def run_generator(self):
index = 0
self._t = time.perf_counter()
coroutines = {}
while not all("100 %" in i["progress"] for i in self._progress):
try:
if index not in coroutines:
coroutines[index] = self._progress[index]["target"]()
coroutines[index].send(None)
except StopIteration:
pass
index += 1
if index == len(self._progress):
index = 0
Önce generator <-> coroutine
yaklaşımıyla ilerleme çubukları ve görevler oluşturalım. Yukarıdaki kodları progressbar.py
isimli bir dosyaya yerleştiriyorum ve başka bir python dosyası açıp içine şunları yazıyorum:
Örnek-1.1. generator <-> coroutine
from progressbar import PBar
def target(bar, array, container):
for i in array:
bar(index=i + 1)
container.append(i ** 2)
yield
c1, c2, c3 = [], [], []
pbar = PBar()
for i, c in enumerate([c1, c2, c3], 1):
pbar.add_progress(target=target, size=10000 * i, args={"array": range(10000 * i), "container": c})
pbar.run_generator()
print(c1)
print(c2)
print(c3)
Örnek-1.2. asyncio
from progressbar import PBar, asyncio
async def target(bar, array, container):
for i in array:
bar(index=i + 1)
container.append(i ** 2)
await asyncio.sleep(0)
c1, c2, c3 = [], [], []
pbar = PBar()
for i, c in enumerate([c1, c2, c3], 1):
pbar.add_progress(target=target, size=10000 * i, args={"array": range(10000 * i), "container": c})
pbar.run_async()
print(c1)
print(c2)
print(c3)
Örnek-1.3. threading
from progressbar import PBar
def target(bar, array, container):
for i in array:
bar(index=i + 1)
container.append(i ** 2)
c1, c2, c3 = [], [], []
pbar = PBar()
for i, c in enumerate([c1, c2, c3], 1):
pbar.add_progress(target=target, size=10000 * i, args={"array": range(10000 * i), "container": c})
pbar.run_thread()
print(c1)
print(c2)
print(c3)
Çıktı:
|▮▮▮▮▮▮▮▮▮▮▮▮▮▮▮▮▮▮▮▮▮▮▮▮▮| - Completed: 100 % - Speed: 20389 - Elapsed: 4 - ETA: 0
|▮▮▮▮▮▮▮▮▮▮▮▮▮▮ | - Completed: 56 % - Speed: 21101 - Elapsed: 5 - ETA: 4
|▮▮▮▮▮▮▮▮▮ | - Completed: 36 % - Speed: 21101 - Elapsed: 5 - ETA: 9
Burada add_progress
fonksiyonunun 3 temel argümanı vardır.
target
: Çalıştırılacak hedef fonksiyondur.
size
: İterasyon sayısını temsil eder.
args
: Hedef fonksiyonun aldığı parametreler.
Hedef fonksiyonun argümanlarını oluştururken bar
isimli bir parametre tanımlıyoruz. Bu bar
parametresi tıpkı tkinter
’deki event
gibi çalışır. Her bir işlemi bir event
gibi ele alır ve o işlemle ilgili olan ilerleme çubuğunda değişiklik yapılmasını sağlar. Ayrıca bütün işlemlerin ekrana yazdırılmasını da bu fonksiyon sağlar. bar
fonksiyonunun index
isimli bir parametresi var. Bu parametreye argüman olarak iterasyon numarası yazılır.
Şimdi gelin, ilerleme çubuğu nesnemizi gerçek bir örnek için kullanalım.
Son zamanlarda dikkatimi çeken bir konu olan Epstein
konusunu araştırıyordum ve davayla alakalı dokümanların bulunduğu, halka açık sitedeki bütün mahkeme kaydı tutanaklarını indirmeyi düşündüm.
Dava dosyalarını indirmek için selenium
kütüphanesini kullanacağım. Dava dosyaları şimdilik 8 tane sayfada yer alıyor. Her bir sayfada değişik sayıda pdf dosyası var.
Şimdi generator <-> coroutine
yöntemiyle her bir sayfada yer alan pdf linklerini çekebileceğimiz bir fonksiyon yazalım. Sonra bu fonksiyonda ufak değişiklikler yaparak threading
ve asyncio
kütüphanelerine uygun hale getirelim.
Örnek-2.1. generator <-> coroutine
import queue
from progressbar import PBar, threading, time, asyncio
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
def get_elements_per_page(q, i):
options = Options()
options.add_argument("-headless")
driver = webdriver.Firefox(options=options)
url = f"https://www.courtlistener.com/docket/4355835/giuffre-v-maxwell/?order_by=desc&page={i}"
driver.get(url)
time.sleep(5)
elements = driver.find_elements(By.TAG_NAME, "a")
q.put([driver, elements])
def get_pdf_links(bar, driver, elements, pdfs):
for index, element in enumerate(elements):
inner = element.get_attribute("innerHTML").strip()
if inner == "Download PDF":
sep = "447706."
url = element.get_attribute("href")
filename = url.split(sep)[-1]
pdfs[filename] = url
bar(index=index + 1)
yield
driver.close()
q = queue.Queue()
threads = []
for page in range(1, 9):
thread = threading.Thread(target=get_elements_per_page, args=(q, page), daemon=True)
threads.append(thread)
thread.start()
pdfs = {}
pbar = PBar()
for thread in threads:
thread.join()
driver, elements = q.get()
pbar.add_progress(target=get_pdf_links, size=len(elements), args={"driver": driver, "elements": elements, "pdfs": pdfs})
pbar.run_generator()
print(pdfs)
Örnek-2.2. asyncio
import queue
from progressbar import PBar, threading, time, asyncio
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
def get_elements_per_page(q, i):
options = Options()
options.add_argument("-headless")
driver = webdriver.Firefox(options=options)
url = f"https://www.courtlistener.com/docket/4355835/giuffre-v-maxwell/?order_by=desc&page={i}"
driver.get(url)
time.sleep(5)
elements = driver.find_elements(By.TAG_NAME, "a")
q.put([driver, elements])
async def get_pdf_links(bar, driver, elements, pdfs):
for index, element in enumerate(elements):
inner = element.get_attribute("innerHTML").strip()
if inner == "Download PDF":
sep = "447706."
url = element.get_attribute("href")
filename = url.split(sep)[-1]
pdfs[filename] = url
bar(index=index + 1)
await asyncio.sleep(0)
driver.close()
q = queue.Queue()
threads = []
for page in range(1, 9):
thread = threading.Thread(target=get_elements_per_page, args=(q, page), daemon=True)
threads.append(thread)
thread.start()
pdfs = {}
pbar = PBar()
for thread in threads:
thread.join()
driver, elements = q.get()
pbar.add_progress(target=get_pdf_links, size=len(elements), args={"driver": driver, "elements": elements, "pdfs": pdfs})
pbar.run_async()
print(pdfs)
Örnek-2.3. threading
import queue
from progressbar import PBar, threading, time, asyncio
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
def get_elements_per_page(q, i):
options = Options()
options.add_argument("-headless")
driver = webdriver.Firefox(options=options)
url = f"https://www.courtlistener.com/docket/4355835/giuffre-v-maxwell/?order_by=desc&page={i}"
driver.get(url)
time.sleep(5)
elements = driver.find_elements(By.TAG_NAME, "a")
q.put([driver, elements])
def get_pdf_links(bar, driver, elements, pdfs):
for index, element in enumerate(elements):
inner = element.get_attribute("innerHTML").strip()
if inner == "Download PDF":
sep = "447706."
url = element.get_attribute("href")
filename = url.split(sep)[-1]
pdfs[filename] = url
bar(index=index + 1)
driver.close()
q = queue.Queue()
threads = []
for page in range(1, 9):
thread = threading.Thread(target=get_elements_per_page, args=(q, page), daemon=True)
threads.append(thread)
thread.start()
pdfs = {}
pbar = PBar()
for thread in threads:
thread.join()
driver, elements = q.get()
pbar.add_progress(target=get_pdf_links, size=len(elements), args={"driver": driver, "elements": elements, "pdfs": pdfs})
pbar.run_thread()
print(pdfs)
Çıktı:
|▮▮▮▮▮▮ | - Completed: 24 % - Speed: 78 - Elapsed: 6 - ETA: 18
|▮▮▮▮▮ | - Completed: 20 % - Speed: 76 - Elapsed: 6 - ETA: 24
|▮▮▮▮▮▮ | - Completed: 24 % - Speed: 78 - Elapsed: 6 - ETA: 18
|▮▮▮▮▮ | - Completed: 20 % - Speed: 78 - Elapsed: 6 - ETA: 20
|▮▮▮▮▮ | - Completed: 20 % - Speed: 77 - Elapsed: 6 - ETA: 25
|▮▮▮▮▮ | - Completed: 20 % - Speed: 78 - Elapsed: 6 - ETA: 25
|▮▮▮▮▮▮▮▮ | - Completed: 32 % - Speed: 78 - Elapsed: 6 - ETA: 13
|▮▮ | - Completed: 8 % - Speed: 76 - Elapsed: 6 - ETA: 53
Bu konuyla ilgili olarak anlatacaklarımın sonuna geldik. Bu makalede, generator <-> coroutine
, asyncio
ve threading
yaklaşımları ile farklı olayları nasıl yaklaşık eş-zamanlı
olarak oluşturabileceğimiz ile alakalı örnekler yapmaya çalıştık. Birçok işlemin ilerleme durumunu ölçen bir progressbar
tasarladık. Birden çok satırı olan karakter dizilerinin taşmadan ekrana yazdırılabilmesi için \r
karakter dizisi yerine start=up
ve clear
ansi karakterlerini nasıl kullanabileceğimizi gördük. Güncel bir örnek üzerinden selenium
yardımıyla 8 tane sayfada bulunan bütün pdf linklerini aynı anda toplamaya çalıştık.
Umarım faydalı bir alıştırma olmuştur. Açıklamamı istediğiniz yer olursa, lütfen belirtin, elimden geldiğince açıklamaya çalışırım.
Herkese iyi günler.
Edit: Ekran temizleme işlemi için ansi karakteri değiştirildi.