Some weeks ago I knew about Twisted, a asynchronous networking library... something
like to node.js for python.
Among a lot of protocols (including SSH, client
and server)
it suports, of course, HTTP,
which allows us to get a dinamic web server running without configuring any apache, nginx or something like that, just doing
And what web application can be built fast on a web server?
a Bittorrent tracker!
For those not knowing it, the role of a torrent tracker is to keep the client lists of who share each torrent,
now they are no longer necessary due to the use of DHT (Distributed Hash Table)
which rely on the own network clients to do this task, but event then, building one is still a a little and useful example.
So, let's go then, the first step will be to import the libraries and prepare the framework where we will work
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 | #!/usr/bin/env python
# coding: utf-8
from twisted.web import server, resource # Object related to HTTP protocol
from twisted.internet import reactor # Twisted asynchronous engine
class Tracker(resource.Resource):
'''Each object of this class will represent a tracker.'''
isLeaf = True # It will not be a superclass.
def render_GET(self, request):
'''A GET petition for the server
(in this example only this will be used, but the pattern is obvious, isn't it? :) )'''
return "Testing..."
if __name__ == "__main__":
# Initializing a tracker as a web server
tracker = server.Site(Tracker())
# Initializing the Twisted engine
reactor.listenTCP(8080, tracker)
# The server gets started
reactor.run()
|
And... the part concerning Twisted it's done, if we execute it we'll get a functioning web server, only remains to add the torrent "tracking".
Well, not really, Twisted works as a program, if a petition get's blocked (tipically through blocking IO),
ALL the application stops untin it comes back, a solution to this issue are asynchronous calls using callbacks
esto se recurre a llamadas asíncronas usando callbacks
(and possibly closures),
something also common in AJAX, for example, this will be what we explore now.
Let's quickly define a couple of functions to manage the Bencode (the codification Bittorrent uses)
and return to the interesting part...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 | def bencode_dict(d):
'''Bencodes a dictionary.
Note that the keys must be _lexicographically_ sorted.'''
bencoded = []
for key in sorted(d.keys(), key=str.upper):
bencoded.append(bencode(key) + bencode(d[key]))
return 'd' + (''.join(bencoded)) + 'e'
def bencode_list(l):
'''Bencodes a list.'''
bencoded = []
for element in l:
bencoded.append(bencode(element))
return 'l' + (''.join(bencoded)) + 'e'
def bencode_int(n):
'''Becodes a number.'''
return 'i%ie' % n
def bencode_str(s):
'''Bencodes a character string.'''
return '%i:%s' % (len(s), s)
def bencode(o):
'''Bencodes a object.'''
if type(o) in (str, unicode): return bencode_str(o).encode('ascii')
elif type(o) == int: return bencode_int(o).encode('ascii')
elif type(o) == list: return bencode_list(o).encode('ascii')
elif type(o) == dict: return bencode_dict(o).encode('ascii')
raise Exception('Invalid type ' + str(type(o)) )
|
Ok, returning to the tracker, the protocol specifies
some parameters to be pased to the tracker as GET
petition options (through the URL, after the '?'), it can be retrieved through the render_GET `request'.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 | def render_GET(self, request):
'''A GET petition for the server
(in this example only this will be used, but the pattern is obvious, isn't it? :) )'''
# Petition argumentos
args = request.args
# Torrent identification
info_hash = args.get('info_hash', [None])[0]
# Client identification
peer_id = args.get('peer_id', [None])[0]
# Torrent client port
port = args.get('port', [None])[0]
# Total uploaded bytes (in base 10 ASCII), 0 by default
uploaded = int(args.get('uploaded', [0])[0])
# Total downloaded bytes (in base 10 ASCII), 0 by default
downloaded = int(args.get('downloaded', [0])[0])
# Remaining bytes to complete every file in the torrent (in base 10 ASCII).
left = args.get('left', [None])[0]
# Compact response option.
compact = bool(int(args.get('compact', [False])[0]))
# Peer id may be omited on the response.
no_peer_id = bool(args.get('no_peer_id', [False])[0])
# `Started', `stopped' oo `completed' (optional).
event = args.get('event', [None])[0]
# Client IP (optional).
ip = args.get('ip', [request.getClientIP()])[0]
# Number of peers on the response (optional), 50 by default.
numwant = int(args.get('numwant', [50])[0])
# Client key (for security reasons).
key = args.get('key', [None])[0]
# ID sent to the client from the server in a former petition (Optional) .
trackerid = args.get('trackerid', [None])[0]
# We keep the control of the `request' object, later will be needed.
handle_request(request, info_hash, peer_id, port, uploaded, downloaded,
left, compact, no_peer_id, event, ip, numwant, key, trackerid)
# The response will not be given yet
return server.NOT_DONE_YET
|
Yep, it's plenty of lines only to extract the parameters but this way everything's clear, now let's go to the interesting part,
we need to use asynchronous IO to access to the database, in order to simplify it we'll use Sqlite
(although the performance may degradate with some clients).
At this point there's a little problem... the sqlite3
python module is completely synchronous, so the only solution is to write a class to wrap it and make it asynchronous :P
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | import sqlite3
from threading import Thread
class AsyncSqlite(Thread):
'''Get the asynchrony using many threads.'''
def __init__(self, database, queries, callback):
'''Constructor.'''
Thread.__init__(self) # Initialize the thread
self.database = database
self.queries = queries
self.callback = callback
def run(self):
'''Already in other thread, do the queries and launch the callback using the results as parameter.'''
conn = sqlite3.connect(self.database)
c = conn.cursor()
results = []
for query, params in self.queries:
c.execute(query, params)
results.append(c.fetchall())
conn.commit()
self.callback(results)
conn.close()
|
This can be tested directly with a terminal:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 | >>> import os
>>> from async_sqlite import AsyncSqlite
>>> os.chdir('/usr/share/doc/basic256/examples/')
>>> def show(data): print '\n'.join(map(str, data[0]))
... # The prompt will come back faster than the results
>>> AsyncSqlite('quotations.sqlite3', ('select * from quotes', ()), show).start()
>>> (1, u'Abraham Lincoln (1809 - 1865)', u'Most folks are about as happy as they make up their minds to be.')
(2, u'George Burns (1896 - 1996)', u'Happiness is having a large, loving, caring, close-knit family in another city.')
(3, u'Jean Houston', u'At the height of laughter, the universe is flung into a kaleidoscope of new possibilities.')
(4, u'Doug Larson', u'Home computers are being called upon to perform many new functions, including the consumption of homework formerly eaten by the dog.')
(5, u'Isaac Asimov', u'I do not fear computers. I fear the lack of them.')
(6, u'Pierre Gallois', u'If you put tomfoolery into a computer, nothing comes out of it but tomfoolery. But this tomfoolery, having passed through a very expensive machine, is somehow ennobled and no-one dares criticize it.')
(7, u'Robert Orben', u'To err is human--and to blame it on a computer is even more so.')
(8, u'Herm Albright (1876 - 1944)', u'A positive attitude may not solve all your problems, but it will annoy enough people to make it worth the effort.')
(9, u'William James (1842 - 1910)', u'The greatest discovery of my generation is that a human being can alter his life by altering his attitudes of mind.')
(10, u'Martha Washington (1732 - 1802)', u'I am still determined to be cheerful and happy, in whatever situation I may be; for I have also learned from experience that the greater part of our happiness or misery depends upon our dispositions, and not upon our circumstances.')
>>>
|
So, using this class...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 | import sqlite3
DB_NAME = 'tracker.sqlite3'
MAX_PEERS = 200
# Create the database if it doesn't exists
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS peers
(info_hash BLOB(20), peer_id BLOB(20), ip VARCHAR(15), port INT, complete BOOLEAN)''')
conn.commit()
conn.close()
def handle_request(request, info_hash, peer_id, port, uploaded, downloaded,
left, compact, no_peer_id, event, ip, numwant, key, trackerid):
'''Prepares the requesto to a petition to the tracker.'''
def callback(data):
'''Callback, won't do anything yet.'''
print data
binfo_hash = sqlite3.Binary(info_hash)
AsyncSqlite(DB_NAME, (('''DELETE FROM peers
WHERE info_hash = ? AND peer_id = ?''',
(binfo_hash, peer_id)),
('''INSERT INTO peers (info_hash, peer_id, ip, port, complete)
VALUES (?, ?, ?, ?)''',
(binfo_hash, peer_id, ip, port, 1 if left == 0 else 0)),
('''SELECT count(1) as seeders FROM peers WHERE complete = 1''', ()),
('''SELECT count(1) as leechers FROM peers WHERE complete = 0''', ()),
('''SELECT peer_id, ip, port FROM peers
WHERE info_hash = ? AND peer_id != ?
ORDER BY RANDOM() LIMIT ?''',
(binfo_hash, peer_id, min(numwant, MAX_PEERS)))),
callback).start()
|
That part it's done, only the callback part lasts, to test this we may write the results just like they get to the callback and close the request:
| def callback(data):
'''Show the data (not the way the Bittorrent want's it yet)
and finish the petition.'''
request.write(str(data))
request.finish()
|
Format the data and it's done:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 | def callback(data):
'''Show the data Bittorrent needs.'''
seeders = data[2][0][0]
leechers = data[3][0][0]
peers = data[4]
# Compact format
if compact:
peer_list = []
for peer in peers: # The reduce() converts the IP to it's binery representation
ip = reduce(lambda x, y: (int(x) << 8) + int(y), peer[1].split("."))
peer_list.append(struct.pack("IH", ip, peer[2]))
peer_structure = ''.join(peer_list)
else:
peer_structure = []
for peer in peers:
new_peer = {'ip': peer[1],
'port': peer[2]}
if (peer[0] is not None) and (not no_peer_id):
new_peer['peer_id'] = peer[0]
peer_structure.append(new_peer)
request.setHeader('Content-Type', 'text/plain')
request.write(bencode({'interval': 5 * 60, # Seconds to wait before the next petition
'complete': seeders,
'incomplete': leechers,
'peers': peer_structure}))
request.finish()
|
That's all, we got to a functioning torrent trackertenemos (although probably there's a lot of things to polish here and there...)