Вот мой скриптец, которым я делаю резервную копию блогов на Тумблере. Скрипт написан на Питоне и работает в Линуксе (про Виндовс и др. не знаю, теоретически должен работать, но я сам не проверял). Скрипт умеет:
- “высасывать” (
export) содержимое блога (как открытого, так и приватного, записи — все, включая привет) в XML-файл (картинки, видео и т.п. не экспортируется!); - “заливать” (
import) полученный XML-файл в любой блог (на который у вас есть права, разумеется). Внимание! В Tumblr API существует ограничение на количество загружаемых фотографий в час (зависит от загрузки сайта, в среднем примерно 50 фото в час). Поэтому скрипт, получая “отлуп” от сервера, приостанавливается и ждет некоторое время, после чего повторяет попытку; - чистить (
clean) указанный блог — скрипт удаляет все содержимое (на самом деле, остаются приватные записи, но, если их немного, можно удалить потом руками).
Код скрипта приведен ниже.
#!/usr/bin/python
# -*- coding: utf-8 -*-
'''tumbackup.py: tumblr export/import utility
Tumblr API description: http://www.tumblr.com/docs/api
Usage: [python] tumbackup.py {import|export|clean} email password blog_name xml-file
'''
import sys
import urllib
import re
from sgmllib import SGMLParser
u = lambda s: s.decode('utf-8')
s = lambda s: s.encode('utf-8')
class TumblrError(Exception):
pass
class LimitExceeded(TumblrError):
pass
class TumblrURLopener(urllib.FancyURLopener):
'''Catch HTTP response codes from Tumblr API
'''
def http_error_default(self, url, fp, errcode, errmsg, headers, data=None):
if errcode == 201:
return fp
if errcode == 403:
errmsg = 'Access forbidden! Check your email and password.'
elif errcode == 400:
errmsg = fp.read()
else:
errmsg = '%s: %s' % (errcode, errmsg)
raise TumblrError(errmsg)
urllib._urlopener = TumblrURLopener()
class TumblrAPI(object):
'''
'''
URL_READ = 'http://%s.tumblr.com/api/read'
URL_WRITE = 'http://www.tumblr.com/api/write'
URL_AUTH = 'http://www.tumblr.com/api/authenticate'
URL_DEL = 'http://www.tumblr.com/api/delete'
def __init__(self, email, password):
self.auth = dict()
self.auth['email'] = email
self.auth['password'] = password
def _request(self, url, data=None):
# sys.stderr.write('**** URLOPEN: %s\n' % (url))
if data:
encoded = urllib.urlencode(data)
# sys.stderr.write(' ENCODED: %s\n' % (encoded))
fd = urllib.urlopen(url, encoded)
else:
fd = urllib.urlopen(url)
content = fd.read()
fd.close()
return content
def authenticate(self):
return self._request(self.URL_AUTH, self.auth)
def _read(self, group, args=None, data=None):
url = self.URL_READ % (group)
if args and not args.has_key('filter'):
args['filter'] = 'none'
sargs = '&'.join(['%s=%s' % (k, v) for k, v in args.items()])
if sargs:
url += '?' + sargs
# sys.stderr.write('**** READING URL %s\n' % (url))
if data:
# sys.stderr.write(' DATA: %s\n' % (str(data)))
return self._request(url, data)
else:
return self._request(url)
def read(self, group, **kargs):
return self._read(group, kargs)
def auth_read(self, group, **kargs):
return self._read(group, kargs, self.auth)
def _write(self, data):
# if data['type'] == 'regular' and len(data['title'] + data['body']) == 0:
# TumblrError('Text posts require either a body field or title field')
# if data['type'] == 'quote' and len(data['quote']) == 0:
# TumblrError('Quote posts require quote text to send!')
# if data['type'] == 'photo' and len(data['data']) == 0:
# TumblrError('Photo posts reqire image data to send!')
encoded = urllib.urlencode(data)
try:
fd = urllib.urlopen(self.URL_WRITE, encoded)
return fd.read()
except TumblrError, detail:
s = str(detail)
if s.find('exceeded') >= 0 and s.find('upload limit') >= 0:
raise LimitExceeded
def write(self, data):
'''Post the data and return new post ID if success
Raise TumblrError on errors
'''
from time import sleep
data.update(self.auth)
delay = 1
while True:
try:
result = self._write(data)
return result
except LimitExceeded:
sys.stderr.write('**** Upload limit exceeded. Waiting %s min...\n' % (delay))
sleep(delay * 60)
delay *= 2
def delete(self, postID):
data = dict()
data.update(self.auth)
data['post-id'] = postID
encoded = urllib.urlencode(data)
urllib.urlopen(self.URL_DEL, encoded)
tag_map = {
'regular-title': 'title',
'regular-body': 'body',
'link-text': 'name',
'link-url': 'url',
'link-description': 'description',
'quote-text': 'quote',
'quote-source': 'source',
'photo-caption': 'caption',
'conversation-title': 'title',
'conversation-text': 'conversation',
'video-caption': 'caption',
'video-source': 'embed',
'video-player': 'PLAYER',
'audio-caption': 'caption',
'audio-player': 'PLAYER',
}
class TumblrContentHandler(SGMLParser):
def __init__(self, callback):
SGMLParser.__init__(self)
self.post = None
self.curtag = None
self.callback = callback
self.photo = False
def unknown_starttag(self, name, attrs):
dattrs = dict()
for k,v in attrs:
dattrs[k] = v
if name == 'post':
self.post = dict()
self.post['type'] = dattrs['type']
self.post['date'] = dattrs['date']
self.post['format'] = dattrs['format']
self.post['tags'] = list()
try:
private = dattrs['private']
if private == 'true':
self.post['private'] = 1
except:
pass
elif self.post is not None:
self.curtag = name
if name == 'photo-url':
maxwidth = dattrs['max-width']
if maxwidth == '500':
self.photo = True
else:
self.photo = False
def unknown_endtag(self, name):
if name == 'post':
self.post['tags'] = ', '.join(self.post['tags'])
self.callback(self.post)
self.post = None
self.curtag = None
def handle_data(self, content):
if self.curtag == 'tag':
self.post['tags'].append(content)
elif self.curtag == 'photo-url' and self.photo:
self.post['source'] = content
elif tag_map.has_key(self.curtag):
tag = tag_map[self.curtag]
try:
self.post[tag] += content
except:
self.post[tag] = content
class SimpleContentHandler(SGMLParser):
def __init__(self, fd):
SGMLParser.__init__(self)
self.fd = fd
self.postCount = 0
self.ok = False
def start_post(self, attrs):
sattrs = ' '.join(['%s="%s"' % (k, v) for k, v in attrs])
if sattrs: sattrs = ' ' + sattrs
self.ok = True
self.post = '<post%s>' % (sattrs)
def unknown_starttag(self, name, attrs):
sattrs = ' '.join(['%s="%s"' % (k, v) for k, v in attrs])
if sattrs: sattrs = ' ' + sattrs
if self.ok:
self.post += '<%s%s>' % (name, sattrs)
def end_post(self):
self.post += '</post>'
self.ok = False
self.fd.write(self.post + '\n')
self.postCount += 1
def unknown_endtag(self, name):
if self.ok:
self.post += '</%s>' % (name)
def handle_data(self, content):
if self.ok:
self.post += content
def getPostCount(self):
return self.postCount
def resetPostCount(self):
self.postCount = 0
class IdList(SGMLParser):
def __init__(self):
SGMLParser.__init__(self)
self.data = list()
self.postCount = 0
def start_post(self, attrs):
self.postCount += 1
for k, v in attrs:
if k == 'id':
self.data.append(v)
break
def getPostCount(self):
return self.postCount
def resetPostCount(self):
self.postCount = 0
def getIdList(self):
return self.data
def ExportBlog(email, password, name, fname):
api = TumblrAPI(email, password)
fd = open(fname, 'w')
fd.write('<?xml version="1.0" encoding="UTF-8"?>\n')
fd.write('<posts>\n')
print 'Fetching posts from', name
count = 0
parser = SimpleContentHandler(fd)
while True:
num = 50
data = api.auth_read(name, start=count, num=num).strip()
# data = api.read(name, start=count, num=num).strip()
parser.feed(data)
sys.stderr.write('.')
if parser.getPostCount() == 0:
break
count += parser.getPostCount()
parser.resetPostCount()
print '\nGot', count, 'posts'
fd.write('</posts>\n')
fd.close()
def ImportBlog(email, password, name, fname):
api = TumblrAPI(email, password)
postList = list()
def appendPost(post):
postList.append(post)
print 'Reading posts from', fname
parser = TumblrContentHandler(appendPost)
parser.feed(open(fname).read())
postList.reverse()
print 'Importing posts to', name
count = 0
for post in postList:
post['group'] = name
# print post
# print '----'
api.write(post)
count += 1
if count % 50 == 0:
sys.stderr.write('.')
print '\nDone'
def Auth(email, password):
api = TumblrAPI(email, password)
print api.authenticate()
def Clean(email, password, name):
api = TumblrAPI(email, password)
print 'Fetching posts from', name
count = 0
parser = IdList()
while True:
num = 50
data = api.auth_read(name, start=count, num=num).strip()
parser.feed(data)
sys.stderr.write('.')
if parser.getPostCount() == 0:
break
count += parser.getPostCount()
parser.resetPostCount()
print '\nGot', count, 'posts'
print 'Deleting', count, 'posts from', name
for item in parser.getIdList():
api.delete(item)
print 'Done.'
if __name__ == '__main__':
if len(sys.argv) < 6:
sys.stderr.write(__doc__)
sys.exit(1)
cmd, email, password, blogName, fname = sys.argv[1:]
if cmd == 'export':
ExportBlog(email, password, blogName, fname)
elif cmd == 'import':
ImportBlog(email, password, blogName, fname)
elif cmd == 'auth':
Auth(email, password)
elif cmd == 'clean':
Clean(email, password, blogName)
else:
sys.stderr.write('Unknown command "%s"\n' % (cmd))