Montando un tracker de torrent con twisted

Hace unas semanas supe de Twisted, una librería asíncrona de programación en red para python... lo que viene siendo node.js para python.

Entre otros muchos protocolos (incluyendo SSH, cliente y servidor) que soporta está por supuesto HTTP, lo que nos permite montar un servidor web dinámico sin necesidad de pararse a configurar apache, nginx o algo por el estilo, simplemente haciendo

pip install twisted

Y que aplicación web se puede montar rápidamente en un servidor web? un tracker de Bittorrent!

Para quien no lo sepa, el papel del tracker de torrent es mantener las listas de los clientes que buscan cada uno de los torrent, han dejado de ser imprescindibles gracias a las DHT (Distributed Hash Table) que dejan este papel a cargo de los propios clientes de la red, pero aún así siguen dando para un ejemplo pequeño y práctico.

Manos a la obra entonces, el primer paso será importar las librerías preparar el marco sobre el que trabajaremos

#!/usr/bin/env python
# coding: utf-8
from twisted.web import server, resource # Objetos referentes al protocolo HTTP
from twisted.internet import reactor     # "Motor" asíncrono de Twisted

class Tracker(resource.Resource):
    '''Cada objeto de esta clase representará un tracker.'''
    isLeaf = True # Declaramos que no pretende ser una superclase.

    def render_GET(self, request):
      '''Una petición GET al servidor
      (en este caso solo se usará esta, pero el patrón esta claro, no? :) ).'''
      return "Testing..."


if __name__ == "__main__":
    # Inicialización de un tracker como un servidor web
    tracker = server.Site(Tracker())
    # Inicialización del motor de Twisted
    reactor.listenTCP(8080, tracker)
    # Se pone a funcionar el servidor
    reactor.run()

Y... la parte que concierne a Twisted ya está, si lo ejecutamos ya tenemos un servidor web funcional, solo falta añadir el "tracking" de los torrent.

Bueno, no realmente, el funcionamiento de Twisted es de un hilo asíncrono, si una petición se bloquea (típicamente por IO bloqueante), TODA la aplicación se detiene hasta que se resuelva, para evitar esto se recurre a llamadas asíncronas usando callbacks (y posiblemente closures), algo típico también de AJAX por ejemplo, esto será lo que exploremos ahora.

Definamos rápidamente dos funciones para manejar el Bencode (codificación que utiliza Bittorrent) y volvamos a los que nos ocupa...

def bencode_dict(d):
    '''Convierte a Bencode un diccionario.
       Ojo, las claves tienen que estar ordenadas _lexicográficamente_.'''
    bencoded = []
    for key in sorted(d.keys(), key=str.upper):
        bencoded.append(bencode(key) + bencode(d[key]))

    return 'd' + (''.join(bencoded)) + 'e'


def bencode_list(l):
    '''Convierte a Bencode una lista.'''
    bencoded = []
    for element in l:
        bencoded.append(bencode(element))

    return 'l' + (''.join(bencoded)) + 'e'


def bencode_int(n):
    '''Convierte a Bencode un número.'''
    return 'i%ie' % n


def bencode_str(s):
    '''Convierte a Bencode una cadena.'''
    return '%i:%s' % (len(s), s)


def bencode(o):
    '''Convierte a Bencode un objeto.'''
    if type(o) in (str, unicode): return bencode_str(o).encode('ascii')
    elif type(o) == int:          return bencode_int(o).encode('ascii')
    elif type(o) == list:         return bencode_list(o).encode('ascii')
    elif type(o) == dict:         return bencode_dict(o).encode('ascii')
    raise Exception('Invalid type ' + str(type(o)) )

Bien, de nuevo con el tracker, el protocolo especifica que se pasen ciertos parámetros al tracker como opciones de la petición GET (a través de la URL, después del '?'), leámoslos entonces del `request' del render_GET

def render_GET(self, request):
    '''Una petición GET al servidor
       (en este caso solo se usará esta, pero el patrón esta claro, no? :) ).'''

    # Lectura de las opciones de la petición
    args = request.args

    # Identificador del torrent
    info_hash = args.get('info_hash', [None])[0]

    # Identificador del cliente (generado con cierta aleatoriedad)
    peer_id = args.get('peer_id', [None])[0]

    # Puerto del cliente torrent
    port = args.get('port', [None])[0]

    # Número total de bytes subidos (en ASCII base 10), por defecto 0
    uploaded = int(args.get('uploaded', [0])[0])

    # Número total de bytes descargados (en ASCII base 10), por defecto 0
    downloaded = int(args.get('downloaded', [0])[0])

    # Número de bytes restantes para completar todos los archivos del torrent (en ASCII base 10).
    left = args.get('left', [None])[0]

    # Respuesta compacta (se hablará de esto cuando se prepare la respuesta).
    compact = bool(int(args.get('compact', [False])[0]))

    # Se permite la omisión del peer_id de los clientes en la respuesta.
    no_peer_id = bool(args.get('no_peer_id', [False])[0])

    # `Started', `stopped' o `completed' (opcional).
    event = args.get('event', [None])[0]

    # IP de la máquina (opcional).
    ip = args.get('ip', [request.getClientIP()])[0]

    # Número de pares en la respuesta (opcional), por defecto 50.
    numwant = int(args.get('numwant', [50])[0])

    # Clave del cliente (por motivos de seguridad).
    key = args.get('key', [None])[0]

    # (Opcional) ID que fué enviada al cliente en una petición anterior.
    trackerid = args.get('trackerid', [None])[0]

    # Mantenemos el control del objeto request por que puede ser necesario más adelante.
    handle_request(request, info_hash, peer_id, port, uploaded, downloaded,
                   left, compact, no_peer_id, event, ip, numwant, key, trackerid)

    # La respuesta no volverá en este hilo
    return server.NOT_DONE_YET

Si, son bastantes líneas para solo extraer los parámetros pero ya queda todo ordenado, ahora vamos con lo interesante, hay que usar IO asíncrona para acceder a la base de datos, por simplificar vamos con Sqlite (aunque probablemente degrade bastante el rendimiento). En este punto hay un pequeño problema... el módulo sqlite3 de python es completamente síncrono, así que toca escribir una clase que lo envuelva y lo haga síncrono :P

import sqlite3
from threading import Thread

class AsyncSqlite(Thread):
    '''Conseguimos la asincronía usando varios hilos.'''

    def __init__(self, database, queries, callback):
        '''Constructor.'''
        Thread.__init__(self) # Importante también construir el Thread

        self.database = database
        self.queries = queries
        self.callback = callback


    def run(self):
        '''Ya ejecutado en otro hilo, hace las peticiónes y lanza el callback con los resultados resultado como parámetro.'''

        conn = sqlite3.connect(self.database)
        c = conn.cursor()

        results = []
        for query, params in self.queries:
            c.execute(query, params)
            results.append(c.fetchall())

        conn.commit()
        self.callback(results)
        conn.close()

Esto lo podemos probar directamente desde un terminal:

>>> import os
>>> from async_sqlite import AsyncSqlite
>>> os.chdir('/usr/share/doc/basic256/examples/')
>>> def show(data): print '\n'.join(map(str, data[0]))
... # El prompt volverá antes que el resultado
>>> AsyncSqlite('quotations.sqlite3', ('select * from quotes', ()), show).start()
>>> (1, u'Abraham Lincoln (1809 - 1865)', u'Most folks are about as happy as they make up their minds to be.')
(2, u'George Burns (1896 - 1996)', u'Happiness is having a large, loving, caring, close-knit family in another city.')
(3, u'Jean Houston', u'At the height of laughter, the universe is flung into a kaleidoscope of new possibilities.')
(4, u'Doug Larson', u'Home computers are being called upon to perform many new functions, including the consumption of homework formerly eaten by the dog.')
(5, u'Isaac Asimov', u'I do not fear computers. I fear the lack of them.')
(6, u'Pierre Gallois', u'If you put tomfoolery into a computer, nothing comes out of it but tomfoolery. But this tomfoolery, having passed through a very expensive machine, is somehow ennobled and no-one dares criticize it.')
(7, u'Robert Orben', u'To err is human--and to blame it on a computer is even more so.')
(8, u'Herm Albright (1876 - 1944)', u'A positive attitude may not solve all your problems, but it will annoy enough people to make it worth the effort.')
(9, u'William James (1842 - 1910)', u'The greatest discovery of my generation is that a human being can alter his life by altering his attitudes of mind.')
(10, u'Martha Washington (1732 - 1802)', u'I am still determined to be cheerful and happy, in whatever situation I may be; for I have also learned from experience that the greater part of our happiness or misery depends upon our dispositions, and not upon our circumstances.')

>>>

Así que, utilizando esta clase...

import sqlite3
DB_NAME = 'tracker.sqlite3'
MAX_PEERS = 200

# Creación de la tabla en caso de no existir
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS peers
             (info_hash BLOB(20), peer_id BLOB(20), ip VARCHAR(15), port INT, complete BOOLEAN)''')
conn.commit()
conn.close()


def handle_request(request, info_hash, peer_id, port, uploaded, downloaded,
                   left, compact, no_peer_id, event, ip, numwant, key, trackerid):
    '''Prepara la respuesta a una petición al tracker.'''

    def callback(data):
        '''Callback, por ahora no hará nada.'''
        print data

    binfo_hash = sqlite3.Binary(info_hash)
    AsyncSqlite(DB_NAME, (('''DELETE FROM peers
                                WHERE info_hash = ? AND peer_id = ?''',
                           (binfo_hash, peer_id)),
                          ('''INSERT INTO peers (info_hash, peer_id, ip, port, complete)
                                VALUES (?, ?, ?, ?)''',
                           (binfo_hash, peer_id, ip, port, 1 if left == 0 else 0)),
                          ('''SELECT count(1) as seeders FROM peers WHERE complete = 1''', ()),
                          ('''SELECT count(1) as leechers FROM peers WHERE complete = 0''', ()),
                          ('''SELECT peer_id, ip, port FROM peers
                                WHERE info_hash = ? AND peer_id != ?
                                ORDER BY RANDOM() LIMIT ?''',
                           (binfo_hash, peer_id, min(numwant, MAX_PEERS)))),
                callback).start()

Eso ya está, queda acabar el callback, por ahora escribamos los resultados tal como salen de la query y cerremos la request:

def callback(data):
    '''Mostramos los datos (no de la forma que quiere Bittorrent aún)
       y finalizamos la petición.'''
    request.write(str(data))
    request.finish()

Solo queda formatear los datos:

def callback(data):
    '''Mostramos los datos que Bittorrent necesita.'''
    seeders = data[2][0][0]
    leechers = data[3][0][0]
    peers = data[4]

    # Formato compacto
    if compact:
        peer_list = []
        for peer in peers: # El reduce convierte la IP a binario
            ip = reduce(lambda x, y: (int(x) << 8) + int(y), peer[1].split("."))
            peer_list.append(struct.pack("IH", ip, peer[2]))

        peer_structure = ''.join(peer_list)

    else:
        peer_structure = []
        for peer in peers:
            new_peer = {'ip': peer[1],
                        'port': peer[2]}

            if (peer[0] is not None) and (not no_peer_id):
                new_peer['peer_id'] = peer[0]

            peer_structure.append(new_peer)

    request.setHeader('Content-Type', 'text/plain')
    request.write(bencode({'interval': 5 * 60, # Segundos a esperar antes de la próxima petición
                           'complete': seeders,
                           'incomplete': leechers,
                           'peers': peer_structure}))

    request.finish()

Y eso es todo, tenemos un tracker de torrent funcional (aunque probablemente haya que pulir cosas aquí y allá...)

Just for fun » « nikto visto desde el servidor