aiohttp ile 1 milyon asenkron istek yapmak (çeviri)

Bu yazıda Python aiohttp'nin sınırlarını test etmek ve dakikalık istek performansını kontrol etmek istiyorum. Herkes, asenkron kodun network işlerinde kullanıldığında daha iyi performans gösterdiğini bilir, fakat bu varsayımın tam olarak nasıl ve neden bu kadar iyi olduğunu kontrol etmek hala ilgi çekici. Python aiohttp ile bir milyon istek yaparak bunu kontrol edeceğim. Dakikada kaç istek yapacak? Basit bir script komut ile bunu yapmaya çalıştığımda ne tarz çökme ve hatalar beklemeliyim? Böyle bir yoğunlukta istek yapmaya çalışırken düşünülmesi gereken ana başlıklar nelerdir?

Merhaba asyncio/aiohttp

Asenkron programlama kolay değildir. Kolay değildir çünkü callback'ler kullanır, olayların şartlarını ve olay işleyicilerini düşünmek normal programlamadan çok daha fazla efor gerektirir. Ayrıca bu biraz farklı çünkü asyncio hala nispeten yeni ve bu konuyla alakalı sadece birkaç blog gönderisi ve eğitim var. Konuyla alakalı resmi Python Belgeleri ise çok yoğun anlatımlı ve sadece basit örnekleri içeriyor. Stack Overflow üzerinde ise çok fazla soru yok. Burada, Şurada ve Şurada asyncio hakkında güzel blog yazıları var hatta belki de Şu, ve Şu da eklenmeli.

İşleri kolaylaştırmak için basit olandan başlayıp bir "Merhaba Dünya" örneği ile GET isteği yapıp tek bir HTTP yanıtını işleyelim. Senkron programlamada sadece şu şekilde yaparsınız;


      import requests
      def merhaba()
          return requests.get("http://httpbin.org/get")
      print(merhaba())
    

Peki bu aiohttp ile nasıl olur?


    #!/usr/local/bin/python3.5
    import asyncio
    from aiohttp import ClientSession

    async def merhaba():
        async with ClientSession() as session:
            async with session.get("http://httpbin.org/headers") as response:
                response = await response.read()
                print(response)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(merhaba())
    

Hmm böyle temel bir görev için çok fazla kod yazmak gerekiyor gibi.. Burada “async def”, “async with”, “await” var. İlk başta karmaşık gibi görünebilir, haydi açıklamaya çalışalım.

async deyimini fonksiyon tanımlarken kullandığımız def deyiminden önce kullanarak ve fonksiyon içerisinde await kullanarak bir fonksiyonu asenkron hale getirirsiniz. Aslında merhaba() işlevi çalışırken iki asenkron operasyon gerçekleşir. Önce asenkron olarak yanıt getirilir, sonra asenkron olarak yanıt gövdesi okunur.

Aiohttp, istekler yapmak için birincil arabirim olarak ClientSession metodunu önerir. ClientSession iki istek arasındaki çerezleri saklayabilme imkanı verir ve tüm istekler için ortak olan nesneleri tutar (event loop, bağlantı ve diğer şeyler). Session'ın kullanıldıktan sonra kapatılması gerekir ve oturum kapamak bir başka asenkron işlemdir. Bu yüzden async deyimine ihtiyacınız var ve her seferinde session'lar ile uğraşmanızın nedeni bu.

client session açtıktan sonra bunları istek yapmak üzere kullanabilirsiniz. Burası isteği indiren asenkron operasyonun başladığı yerdir. Client session'ların yanıtı durumunda olduğu gibi açıkca kapatılması gerekir ve bağlam yöneticisi olan with her koşulda düzgün kapatılmasını sağlar.

Programı başlatmak için event loop (olay döngüsü)'nü çalıştırmanız gerekir ve bunu yapmak için asyncio instance'ı yaratmanız ve bu döngünün içine görevi koymanız gerekir.

Tüm bunlar biraz zor geliyor olabilir fakat anlamaya çalışırken biraz zaman harcamak mantıklı görünüyor.

Çoklu url getirmek

Şimdi daha ilginç bir şey deneyelim ve birbirini takip eden isteklerle çoklu urller getirelim. Senkron (Bildiğimiz normal çalışan) kod aşağıdaki gibi oluyor.


    for url in urls:
      print(requests.get(url).text)
    

Bu kodu yazması ve kullanması çok kolay, fakat Asenkron bu kodar kolay olmayacak. Bu yüzden daha karmaşık bir şeye gerçekten ihtiyacınız olup olmadığını her zaman düşünmelisiniz. Uygulamanız senkron kod ile düzgün çalışıyorsa belki de asenkron kod ile daha karmaşık hale getirmeye gerek olmayabilir. Eğer zahmetli olan yolu seçip Asenkron kod yazmak isterseniz yapmanız gereken şey aşağıda. Bizim merhaba() fonksiyonumuz hala aynı kalacak fakat bizim tüm listeyi görev olarak asyncio Future nesnesi haline getirip çalıştırılmak üzere loop'a vermemiz gerekiyor.


      loop = asyncio.get_event_loop()
      tasks = []
      
      url = "http://localhost:8080/{}"
      for i in range(5):
          task = asyncio.ensure_future(merhaba(url.format(i)))
          tasks.append(task)
      loop.run_until_complete(asyncio.wait(tasks))
    

Şimdi diyelim ki tüm yanıtları toplamak ve onlara bazı ön işlemler uygulamak istiyoruz. Şu an biz hiçbir yerde yanıt gövdesini tutmuyoruz, sadece ekrana basıyoruz. Haydi yanıt olarak döndürelim ve en sonda hepsini basalım.

Yanıtları bir gruba toparlamak için muhtemelen aşağıdaki gibi bir şey yapmak gerekir.


      #!/usr/local/bin/python3.5
      import asyncio
      from aiohttp import ClientSession

      async def fetch(url):
          async with ClientSession() as session:
              async with session.get(url) as response:
                  return await response.read()

      async def run(loop,  r):
          url = "http://localhost:8080/{}"
          tasks = []
          for i in range(r):
              task = asyncio.ensure_future(fetch(url.format(i)))
              tasks.append(task)

          responses = await asyncio.gather(*tasks)
          print_responses(responses)

      def print_responses(result):
          print(result)

      loop = asyncio.get_event_loop()
      future = asyncio.ensure_future(run(loop, 4))
      loop.run_until_complete(future)
    

asyncio.gather(), Future nesnelerini bir yerde toplar ve hepsi bitinceye kadar bekler.

Genel Hatalar

Şimdi gerçek bir öğrenme sürecini simüle edelim ve bir hata yapalım, bu amaca uygun bir örnek görmek için yardımcı olabilir.

Bozuk bir asenkron fonksiyon aşağıdaki gibi görünebilir;


      # Çalışmayan Asenkron Kodu
      async def fetch(url):
          async with ClientSession() as session:
              async with session.get(url) as response:
                  return response.read()
    

Bu kod bozuk ama asyncio hakkında pek bir şey bilmeden hatayı anlamaya çalışmak kolay değil. Python dilini iyi biliyor olsanız bile yeteri kadar asyncio ve aiohttp konularına hakim değilseniz burada gerçekte neler olduğunu anlamanız zor.

Bu fonksiyonun çıktısı nedir?

Fonksiyon bu çıktıyı üretiyor;


      pawel@pawel-VPCEH390X ~/p/l/benchmarker> ./bench.py 
      [<generator object ClientResponse.read at 0x7fa68d465728>, <generator object ClientResponse.read at 0x7fa68cdd9468>]
    

Burada ne oluyor? İşlemler bittiğinde yanıtlara ait objeleri bekliyorsunuz fakat burada Generator nesneleri var.

Bu bu şekilde oluyor çünkü önce de belirttiğim gibi response.read() bir asenkron operasyon. Bunun anlamı, sonucu hemen döndürmüyor, döndürdüğü şey sadece generator. Bu generator'lerin hala çağrılmaya ve çalıştırılmaya ihtiyacı var ve bu default olarak yapılmıyor. Python 3.4'de eklenen yield from ve Python 3.5'de eklenen await tam olarak bu amaçla geldi: Generator objesini iterate etmek için. Yukarıdaki hatayı düzeltmek için response.read() önüne await eklemek yeterli olacak.


      return await response.read()
    

Haydi bu sefer de kodu bir başka yoldan bozalım.


      #hatalı kod
      async def run(loop,  r):
      url = "http://localhost:8080/{}"
      tasks = []
      for i in range(r):
          task = asyncio.ensure_future(fetch(url.format(i)))
          tasks.append(task)

      responses = asyncio.gather(*tasks)
      print(responses)
    

Dediğim gibi, yukarıdaki hatayı da asyncio bilmeden anlamak biraz zor olabilir.

Yukarıdaki işlemin çıktısı şu;


      pawel@pawel-VPCEH390X ~/p/l/benchmarker> ./bench.py 
      <_GatheringFuture pending>
      
      Task was destroyed but it is pending!
      task: <Task pending coro=<fetch() running at ./bench.py:7> wait_for=<Future pending 
        cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(0)() at /usr/local/lib/python3.5/asyncio/tasks.py:602]>
      
      Task was destroyed but it is pending!
      task: <Task pending coro=<fetch() running at ./bench.py:7> wait_for=<Future pending 
        cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(1)() at /usr/local/lib/python3.5/asyncio/tasks.py:602]>
      
      Task was destroyed but it is pending!
      task: <Task pending coro=<fetch() running at ./bench.py:7> wait_for=<Future pending 
        cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(2)() at /usr/local/lib/python3.5/asyncio/tasks.py:602]>
      
      Task was destroyed but it is pending!
      task: <Task pending coro=<fetch() running at ./bench.py:7> wait_for=<Future pending 
        cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(3)() at /usr/local/lib/python3.5/asyncio/tasks.py:602]>
    

Burada ne oldu? Karşı tarafın loglarına bakabilseydiniz aslında isteklerin onlara ulaşmadığını görürdünüz. Açıkcası hiçbir istek yapılmadı. Print deyimi <_GatheringFuture pending> objesini içeren bir değişkeni bastı. Daha sonra bekleyen görevler yok edildi ve bildirildi. Neden böyle oldu? Çünkü await'i yine unuttuk.

Hatalı kısım şuydu;


            responses = asyncio.gather(*tasks)
    

Şöyle olmalı;


            responses = await asyncio.gather(*tasks)
    

Benim hatalar hakkındaki tahminim, eğer bir şeyler bekliyorsanız her zaman await kullanımını hatırlayın.

Senkron ve Asenkron

Nihayet biraz eğlence vakti. Haydi asenkronun gerçekten zor olup olmadığına bakalım. Asenkron ve bloklanan istemci arasındaki verimlilik farkı nedir? Asenkron istemcimden dakikada kaç istek yapabilirim? Kafamdaki bu sorular ile kendi aiohttp sunucumu kuracağım. Benim sunucum Marry Shelley'nin Frankenstein'ına ait tüm html belgesini okuyacak. Yanıtlar arasında rasgele gecikme süreleri oluşacak. Bu örnek gerçek hayattaki gibi olmalı, yanıt süreleri genellikle birbirinden farklıdır.

Sunucu kodu aşağıdaki gibi;


        #!/usr/local/bin/python3.5
        import asyncio
        from datetime import datetime
        from aiohttp import web
        import random

        random.seed(1)

        async def hello(request):
            isim = request.match_info.get("isim", "foo")
            n = datetime.now().isoformat()
            gecikme = random.randint(0, 3)
            await asyncio.sleep(gecikme)
            headers = {"content_type": "text/html", "gecikme": str(gecikme)}
            # burada dosyayı senkron olarak açıyorum. performansı iyileştirmek
            # için asyncio Executor'lerini kullanabilirsiniz.
            # https://docs.python.org/3/library/asyncio-eventloop.html#executor
            # https://pymotw.com/3/asyncio/executors.html
            with open("frank.html", "rb") as html_body:
                print("{}: {} gecikme: {}".format(n, request.path, gecikme))
                response = web.Response(body=html_body.read(), headers=headers)
            return response

        app = web.Application()
        app.router.add_route("GET", "/{isim}", hello)
        web.run_app(app)
    

Senkron kod ise şu şekilde;


        import requests
        r = 100

        url = "http://localhost:8080/{}"
        for i in range(r):
            res = requests.get(url.format(i))
            delay = res.headers.get("DELAY")
            d = res.headers.get("DATE")
            print("{}:{} delay {}".format(d, res.url, delay))
    

Çalışması ne kadar sürüyor? Benim bilgisayarımda senkron olarak çalışması 2:45.54 dakika aldı.

Aynı işlemi yapan asenkron kodum ise 0:03.48 dakika aldı.

Eğer ekrana yazdırılan sürelere bakarsanız asenkron istemcinin nasıl harika çalıştığını görürsünüz. Bazen 0 gecikme yaşanmış fakat bazen 3 saniye gecikme yaşanmış. Senkron kodda bunlar bloklanma ve bekleme sebebidir. Bu da makinanızın basitce boşta kalması anlamına geliyor. Asenkron kod hiç vakit kaybetmiyor. İstekler yapılır ve yanıtlar ayrıca işlenir. Açıkca görebilirsiniz ki, ilk istekler 0 gecikmelidir, sonraki yanıtlar ise 1 saniye gecikme ile en gecikmiş tepkiler gelene kadar görülebildi.

Sınırları Zorlamak

Şimdi bizim async client'ın daha iyi olduğunu biliyoruz. Haydi onun sınırlarını zorlayalım ve çökertmeyi deneyelim. Şimdi ona 1K istek yollayacağım. Kaçını işleyebilecek merak ediyorum.


        > time python3 bench.py

        2.68user 0.24system 0:07.14elapsed 40%CPU (0avgtext+0avgdata 53704maxresident)k
        0inputs+0outputs (0major+14156minor)pagefaults 0swaps
    

1K istek 7 saniye aldı, harika. Peki ya 10K? 10K istek yapmayı deniyorum fakat ne yazık ki hata alıyorum.


        responses are <_GatheringFuture finished exception=ClientOSError(24, 'Cannot connect to host localhost:8080 ssl:False [Can not connect to localhost:8080 [Too many open files]]')>
        Traceback (most recent call last):
          File "/home/pawel/.local/lib/python3.5/site-packages/aiohttp/connector.py", line 581, in _create_connection
          File "/usr/local/lib/python3.5/asyncio/base_events.py", line 651, in create_connection
          File "/usr/local/lib/python3.5/asyncio/base_events.py", line 618, in create_connection
          File "/usr/local/lib/python3.5/socket.py", line 134, in __init__
        OSError: [Errno 24] Too many open files
    

Kötü oldu. C10K problemine takıldım gibi görünüyor.

“too many open files” dediği şey muhtemelen açık socket sayısına işaret ediyor. Neden dosya çağırıyor ki? Soketler sadece dosya tanımlayıcılarıdır. İşletim sistemleri açık socket sayısını kısıtlar. Kaç dosya "too many"? Ben Python kaynakları ile test ettiğimde 1024 civarı olduğunu gördüm. Peki bunu nasıl aşabiliriz? İlkel yol açık dosya sayısını kısıtlamaktır. Fakat bu muhtemelen iyi bir yol değil. Yapılan eşzamanlı istek sayısını kısıtlarsak muhtemelen bu daha iyi bir yol olur. Görev sayısını 1000 ile sınırlamak için koda asyncio.Semaphore() ekleyeceğim.

Düzenlediğim kod şimdi böyle görünüyor;


        import random
        import asyncio
        from aiohttp import ClientSession

        async def fetch(url):
            async with ClientSession() as session:
                async with session.get(url) as response:
                    delay = response.headers.get("DELAY")
                    date = response.headers.get("DATE")
                    print("{}:{} with delay {}".format(date, response.url, delay))
                    return await response.read()


        async def bound_fetch(sem, url):
            # getter function with semaphore
            async with sem:
                await fetch(url)


        async def run(loop,  r):
            url = "http://localhost:8080/{}"
            tasks = []
            # create instance of Semaphore
            sem = asyncio.Semaphore(1000)
            for i in range(r):
                # pass Semaphore to every GET request
                task = asyncio.ensure_future(bound_fetch(sem, url.format(i)))
                tasks.append(task)

            responses = asyncio.gather(*tasks)
            await responses

        number = 10000
        loop = asyncio.get_event_loop()

        future = asyncio.ensure_future(run(loop, number))
        loop.run_until_complete(future)
    

Bu noktada ben 10k url işleyebiliyorum ve 23 saniye alıyor.

Peki ya 100.000? Bunun için bilgisayarım gerçekten zorlanacak fakat şaşırtıcı şekilde çalışıyor. Ram kullanımı yüksek ve CPU 100% civarında çalışıyor. İlginç olan şey ise istemci tarafında çok daha az tüketilmesi. Aşağıda ps çıktısı var;


        pawel@pawel-VPCEH390X ~/p/l/benchmarker> ps ua | grep python

        USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
        pawel     2447 56.3  1.0 216124 64976 pts/9    Sl+  21:26   1:27 /usr/local/bin/python3.5 ./test_server.py
        pawel     2527  101  3.5 674732 212076 pts/0   Rl+  21:26   2:30 /usr/local/bin/python3.5 ./bench.py
    

Bazı nedenlerden dolayı bozulmadan önce 5 dakika kadar çalıştı. Bazı yanıtlar kapatılmamış gibi görünüyor, benim server kodunda veya istemci kodunda bir hata olmuş olabilir mi?

Biraz scroll'ladıktan sonra client log'larında şu exception'ı buluyorum.


        File "/usr/local/lib/python3.5/asyncio/futures.py", line 387, in __iter__
            return self.result()  # May raise too.
          File "/usr/local/lib/python3.5/asyncio/futures.py", line 274, in result
            raise self._exception
          File "/usr/local/lib/python3.5/asyncio/selector_events.py", line 411, in _sock_connect
            sock.connect(address)
        OSError: [Errno 99] Cannot assign requested address
    

Burada ne olmuş gerçekten bilmiyorum. İlk hipotezim test server'ımın bir anlığına düşmüş olduğu yönünde. Bir okuyucu bunun işletim sistemi tarafından kaynaklandığını düşünüyor. Ben semaphore'u önce ekledim ki eşzamanlı bağlantı sayısı maximum 1000 olsun, fakat bazı soketler belki de hala kapalı ve kernel üzerinde atama için kullanılamıyor.

Aslında tüm bunlar çok kötü değil. 5 dakika için 100K istek? dakikada 20K istek bana sorarsanız çok güçlü.

Nihayet ben 1 milyon istek yapmayı deniyorum. 52 dakikada tamamlanıyor.


        1913.06user 1196.09system 52:06.87elapsed 99%CPU (0avgtext+0avgdata 5194260maxresident)k
        265144inputs+0outputs (18692major+2528207minor)pagefaults 0swaps
    

Bu da benim istemcim dakikada 19230 istek yapıyor manasına geliyor. Kötü değil değil mi? Client'ın yeteneklerinin 0 ve 1 saniye gecikme süresi arasında sınırlı olduğunu unutmayın. Ayrıca test server'ı sessizce birkaç kere patlamış görünüyor.

Sonuç

Gördüğünüz gibi asenkron HTTP istemcileri süper güçlü. Asenkron istemci için istek sayısının 1 milyon olmasının bir önemi yok. Ve işini gayet iyi yapıyor.

Ben diğer diller ve frameworkler ile karşılaştırmalar nasıl sonuç verir merak ediyorum. Belki de yakın zamanda Twisted Treq ile aiohttp'yi test edip bir blog post yayınlarım. Burada sorulan sorular diğer diller ve frameworkler için de sorulabilir. Örneğin, Javadaki asenkron frameworklerinin performans sonuçları nasıldır? veya C++ frameworklerinde? veya Rust HTTP istemcilerinde?

Kaynak : "Making 1 million requests with python-aiohttp", Paweł Miech, 2016