scripting a torrent tracker with twisted

Some weeks ago I knew about Twisted, a asyn­chro­nous networking library... something like to node.js for python.

Among a lot of protocols (including SSH, client and server) it suports, of course, HTTP, which allows us to get a dinamic web server running without con­fig­ur­ing any apache, nginx or something like that, just doing

pip install twisted

And what web ap­pli­ca­tion can be built fast on a web server? a Bittorrent tracker!

For those not knowing it, the role of a torrent tracker is to keep the client lists of who share each torrent, now they are no longer necessary due to the use of DHT (Dis­trib­uted Hash Table) which rely on the own network clients to do this task, but event then, building one is still a a little and useful example.

So, let's go then, the first step will be to import the libraries and prepare the framework where we will work

#!/usr/bin/env python
# coding: utf-8
from twisted.web import server, resource # Object related to HTTP protocol
from twisted.internet import reactor     # Twisted asynchronous engine

class Tracker(resource.Resource):
    '''Each object of this class will represent a tracker.'''
    isLeaf = True # It will not be a superclass.

    def render_GET(self, request):
      '''A GET petition for the server
      (in this example only this will be used, but the pattern is obvious, isn't it? :) )'''
      return "Testing..."


if __name__ == "__main__":
    # Initializing a tracker as a web server
    tracker = server.Site(Tracker())
    # Initializing the Twisted engine
    reactor.listenTCP(8080, tracker)
    # The server gets started
    reactor.run()

And... the part concerning Twisted it's done, if we execute it we'll get a func­tion­ing web server, only remains to add the torrent "tracking".

Well, not really, Twisted works as a program, if a petition get's blocked (tipically through blocking IO), ALL the ap­pli­ca­tion stops untin it comes back, a solution to this issue are asyn­chro­nous calls using callbacks esto se recurre a llamadas asíncronas usando callbacks (and possibly closures), something also common in AJAX, for example, this will be what we explore now.

Let's quickly define a couple of functions to manage the Bencode (the cod­i­fi­ca­tion Bittorrent uses) and return to the in­ter­est­ing part...

def bencode_dict(d):
    '''Bencodes a dictionary.
       Note that the keys must be _lexicographically_ sorted.'''
    bencoded = []
    for key in sorted(d.keys(), key=str.upper):
        bencoded.append(bencode(key) + bencode(d[key]))

    return 'd' + (''.join(bencoded)) + 'e'


def bencode_list(l):
    '''Bencodes a list.'''
    bencoded = []
    for element in l:
        bencoded.append(bencode(element))

    return 'l' + (''.join(bencoded)) + 'e'


def bencode_int(n):
    '''Becodes a number.'''
    return 'i%ie' % n


def bencode_str(s):
    '''Bencodes a character string.'''
    return '%i:%s' % (len(s), s)


def bencode(o):
    '''Bencodes a object.'''
    if type(o) in (str, unicode): return bencode_str(o).encode('ascii')
    elif type(o) == int:          return bencode_int(o).encode('ascii')
    elif type(o) == list:         return bencode_list(o).encode('ascii')
    elif type(o) == dict:         return bencode_dict(o).encode('ascii')
    raise Exception('Invalid type ' + str(type(o)) )

Ok, returning to the tracker, the protocol specifies some parameters to be pased to the tracker as GET petition options (through the URL, after the '?'), it can be retrieved through the render_GET `request'.

def render_GET(self, request):
      '''A GET petition for the server
      (in this example only this will be used, but the pattern is obvious, isn't it? :) )'''

    # Petition argumentos
    args = request.args

    # Torrent identification
    info_hash = args.get('info_hash', [None])[0]

    # Client identification
    peer_id = args.get('peer_id', [None])[0]

    # Torrent client port
    port = args.get('port', [None])[0]

    # Total uploaded bytes (in base 10 ASCII), 0 by default
    uploaded = int(args.get('uploaded', [0])[0])

    # Total downloaded bytes (in base 10 ASCII), 0 by default
    downloaded = int(args.get('downloaded', [0])[0])

    # Remaining bytes to complete every file in the torrent (in base 10 ASCII).
    left = args.get('left', [None])[0]

    # Compact response option.
    compact = bool(int(args.get('compact', [False])[0]))

    # Peer id may be omited on the response.
    no_peer_id = bool(args.get('no_peer_id', [False])[0])

    # `Started', `stopped' oo `completed' (optional).
    event = args.get('event', [None])[0]

    # Client IP (optional).
    ip = args.get('ip', [request.getClientIP()])[0]

    # Number of peers on the response (optional), 50 by default.
    numwant = int(args.get('numwant', [50])[0])

    # Client key (for security reasons).
    key = args.get('key', [None])[0]

    # ID sent to the client from the server in a former petition (Optional) .
    trackerid = args.get('trackerid', [None])[0]

    # We keep the control of the `request' object, later will be needed.
    handle_request(request, info_hash, peer_id, port, uploaded, downloaded,
                   left, compact, no_peer_id, event, ip, numwant, key, trackerid)

    # The response will not be given yet
    return server.NOT_DONE_YET

Yep, it's plenty of lines only to extract the parameters but this way every­thing's clear, now let's go to the in­ter­est­ing part, we need to use asyn­chro­nous IO to access to the database, in order to simplify it we'll use Sqlite (although the per­for­mance may degradate with some clients). At this point there's a little problem... the sqlite3 python module is completely syn­chro­nous, so the only solution is to write a class to wrap it and make it asyn­chro­nous :P

import sqlite3
from threading import Thread

class AsyncSqlite(Thread):
    '''Get the asynchrony using many threads.'''

    def __init__(self, database, queries, callback):
        '''Constructor.'''
        Thread.__init__(self) # Initialize the thread

        self.database = database
        self.queries = queries
        self.callback = callback


    def run(self):
        '''Already in other thread, do the queries and launch the callback using the results as parameter.'''

        conn = sqlite3.connect(self.database)
        c = conn.cursor()

        results = []
        for query, params in self.queries:
            c.execute(query, params)
            results.append(c.fetchall())

        conn.commit()
        self.callback(results)
        conn.close()

This can be tested directly with a terminal:

>>> import os
>>> from async_sqlite import AsyncSqlite
>>> os.chdir('/usr/share/doc/basic256/examples/')
>>> def show(data): print '\n'.join(map(str, data[0]))
... # The prompt will come back faster than the results
>>> AsyncSqlite('quotations.sqlite3', ('select * from quotes', ()), show).start()
>>> (1, u'Abraham Lincoln (1809 - 1865)', u'Most folks are about as happy as they make up their minds to be.')
(2, u'George Burns (1896 - 1996)', u'Happiness is having a large, loving, caring, close-knit family in another city.')
(3, u'Jean Houston', u'At the height of laughter, the universe is flung into a kaleidoscope of new possibilities.')
(4, u'Doug Larson', u'Home computers are being called upon to perform many new functions, including the consumption of homework formerly eaten by the dog.')
(5, u'Isaac Asimov', u'I do not fear computers. I fear the lack of them.')
(6, u'Pierre Gallois', u'If you put tomfoolery into a computer, nothing comes out of it but tomfoolery. But this tomfoolery, having passed through a very expensive machine, is somehow ennobled and no-one dares criticize it.')
(7, u'Robert Orben', u'To err is human--and to blame it on a computer is even more so.')
(8, u'Herm Albright (1876 - 1944)', u'A positive attitude may not solve all your problems, but it will annoy enough people to make it worth the effort.')
(9, u'William James (1842 - 1910)', u'The greatest discovery of my generation is that a human being can alter his life by altering his attitudes of mind.')
(10, u'Martha Washington (1732 - 1802)', u'I am still determined to be cheerful and happy, in whatever situation I may be; for I have also learned from experience that the greater part of our happiness or misery depends upon our dispositions, and not upon our circumstances.')

>>>

So, using this class...

import sqlite3
DB_NAME = 'tracker.sqlite3'
MAX_PEERS = 200

# Create the database if it doesn't exists
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS peers
             (info_hash BLOB(20), peer_id BLOB(20), ip VARCHAR(15), port INT, complete BOOLEAN)''')
conn.commit()
conn.close()


def handle_request(request, info_hash, peer_id, port, uploaded, downloaded,
                   left, compact, no_peer_id, event, ip, numwant, key, trackerid):
    '''Prepares the requesto to a petition to the tracker.'''

    def callback(data):
        '''Callback, won't do anything yet.'''
        print data

    binfo_hash = sqlite3.Binary(info_hash)
    AsyncSqlite(DB_NAME, (('''DELETE FROM peers
                                WHERE info_hash = ? AND peer_id = ?''',
                           (binfo_hash, peer_id)),
                          ('''INSERT INTO peers (info_hash, peer_id, ip, port, complete)
                                VALUES (?, ?, ?, ?)''',
                           (binfo_hash, peer_id, ip, port, 1 if left == 0 else 0)),
                          ('''SELECT count(1) as seeders FROM peers WHERE complete = 1''', ()),
                          ('''SELECT count(1) as leechers FROM peers WHERE complete = 0''', ()),
                          ('''SELECT peer_id, ip, port FROM peers
                                WHERE info_hash = ? AND peer_id != ?
                                ORDER BY RANDOM() LIMIT ?''',
                           (binfo_hash, peer_id, min(numwant, MAX_PEERS)))),
                callback).start()

That part it's done, only the callback part lasts, to test this we may write the results just like they get to the callback and close the request:

def callback(data):
    '''Show the data (not the way the Bittorrent want's it yet)
       and finish the petition.'''
    request.write(str(data))
    request.finish()

Format the data and it's done:

def callback(data):
    '''Show the data Bittorrent needs.'''
    seeders = data[2][0][0]
    leechers = data[3][0][0]
    peers = data[4]

    # Compact format
    if compact:
        peer_list = []
        for peer in peers: # The reduce() converts the IP to it's binery representation
            ip = reduce(lambda x, y: (int(x) << 8) + int(y), peer[1].split("."))
            peer_list.append(struct.pack("IH", ip, peer[2]))

        peer_structure = ''.join(peer_list)

    else:
        peer_structure = []
        for peer in peers:
            new_peer = {'ip': peer[1],
                        'port': peer[2]}

            if (peer[0] is not None) and (not no_peer_id):
                new_peer['peer_id'] = peer[0]

            peer_structure.append(new_peer)

    request.setHeader('Content-Type', 'text/plain')
    request.write(bencode({'interval': 5 * 60, # Seconds to wait before the next petition
                           'complete': seeders,
                           'incomplete': leechers,
                           'peers': peer_structure}))

    request.finish()

That's all, we got to a func­tion­ing torrent track­erten­emos (although probably there's a lot of things to polish here and there...)