November 23, 2009
Tumblr Backup

Вот мой скриптец, которым я делаю резервную копию блогов на Тумблере. Скрипт написан на Питоне и работает в Линуксе (про Виндовс и др. не знаю, теоретически должен работать, но я сам не проверял). Скрипт умеет:

  • “высасывать” (export) содержимое блога (как открытого, так и приватного, записи — все, включая привет) в XML-файл (картинки, видео и т.п. не экспортируется!);
  • “заливать” (import) полученный XML-файл в любой блог (на который у вас есть права, разумеется). Внимание! В Tumblr API существует ограничение на количество загружаемых фотографий в час (зависит от загрузки сайта, в среднем примерно 50 фото в час). Поэтому скрипт, получая “отлуп” от сервера, приостанавливается и ждет некоторое время, после чего повторяет попытку;
  • чистить (clean) указанный блог — скрипт удаляет все содержимое (на самом деле, остаются приватные записи, но, если их немного, можно удалить потом руками).

Код скрипта приведен ниже.

#!/usr/bin/python
# -*- coding: utf-8 -*-
'''tumbackup.py: tumblr export/import utility

Tumblr API description: http://www.tumblr.com/docs/api

Usage: [python] tumbackup.py {import|export|clean} email password blog_name xml-file
'''

import sys
import urllib
import re
from sgmllib import SGMLParser

u = lambda s: s.decode('utf-8')
s = lambda s: s.encode('utf-8')

class TumblrError(Exception):
    pass

class LimitExceeded(TumblrError):
    pass

class TumblrURLopener(urllib.FancyURLopener):
    '''Catch HTTP response codes from Tumblr API
    '''
    def http_error_default(self, url, fp, errcode, errmsg, headers, data=None):
        if errcode == 201:
            return fp
        if errcode == 403:
            errmsg = 'Access forbidden! Check your email and password.'
        elif errcode == 400:
            errmsg = fp.read()
        else:
            errmsg = '%s: %s' % (errcode, errmsg)
        raise TumblrError(errmsg)

urllib._urlopener = TumblrURLopener()

class TumblrAPI(object):
    '''
    '''
    URL_READ = 'http://%s.tumblr.com/api/read'
    URL_WRITE = 'http://www.tumblr.com/api/write'
    URL_AUTH = 'http://www.tumblr.com/api/authenticate'
    URL_DEL = 'http://www.tumblr.com/api/delete'

    def __init__(self, email, password):
        self.auth = dict()
        self.auth['email'] = email
        self.auth['password'] = password

    def _request(self, url, data=None):
#         sys.stderr.write('**** URLOPEN: %s\n' % (url))
        if data:
            encoded = urllib.urlencode(data)
#             sys.stderr.write('     ENCODED: %s\n' % (encoded))
            fd = urllib.urlopen(url, encoded)
        else:
            fd = urllib.urlopen(url)
        content = fd.read()
        fd.close()
        return content

    def authenticate(self):
        return self._request(self.URL_AUTH, self.auth)

    def _read(self, group, args=None, data=None):
        url = self.URL_READ % (group)
        if args and not args.has_key('filter'):
            args['filter'] = 'none'
            sargs = '&'.join(['%s=%s' % (k, v) for k, v in args.items()])
            if sargs:
                url += '?' + sargs
#         sys.stderr.write('**** READING URL %s\n' % (url))
        if data:
#             sys.stderr.write('     DATA: %s\n' % (str(data)))
            return self._request(url, data)
        else:
            return self._request(url)

    def read(self, group, **kargs):
        return self._read(group, kargs)

    def auth_read(self, group, **kargs):
        return self._read(group, kargs, self.auth)

    def _write(self, data):
#         if data['type'] == 'regular' and len(data['title'] + data['body']) == 0:
#             TumblrError('Text posts require either a body field or title field')
#         if data['type'] == 'quote' and len(data['quote']) == 0:
#             TumblrError('Quote posts require quote text to send!')
#         if data['type'] == 'photo' and len(data['data']) == 0:
#             TumblrError('Photo posts reqire image data to send!')
        encoded = urllib.urlencode(data)
        try:
            fd = urllib.urlopen(self.URL_WRITE, encoded)
            return fd.read()
        except TumblrError, detail:
            s = str(detail)
            if s.find('exceeded') >= 0 and s.find('upload limit') >= 0:
                raise LimitExceeded

    def write(self, data):
        '''Post the data and return new post ID if success
        Raise TumblrError on errors
        '''
        from time import sleep
        data.update(self.auth)
        delay = 1
        while True:
            try:
                result = self._write(data)
                return result
            except LimitExceeded:
                sys.stderr.write('**** Upload limit exceeded. Waiting %s min...\n' % (delay))
                sleep(delay * 60)
                delay *= 2


    def delete(self, postID):
        data = dict()
        data.update(self.auth)
        data['post-id'] = postID
        encoded = urllib.urlencode(data)
        urllib.urlopen(self.URL_DEL, encoded)

tag_map = {
    'regular-title':      'title',
    'regular-body':       'body',
    'link-text':          'name',
    'link-url':           'url',
    'link-description':   'description',
    'quote-text':         'quote',
    'quote-source':       'source',
    'photo-caption':      'caption',
    'conversation-title': 'title',
    'conversation-text':  'conversation',
    'video-caption':      'caption',
    'video-source':       'embed',
    'video-player':       'PLAYER',
    'audio-caption':      'caption',
    'audio-player':       'PLAYER',
    }

class TumblrContentHandler(SGMLParser):
    def __init__(self, callback):
        SGMLParser.__init__(self)
        self.post = None
        self.curtag = None
        self.callback = callback
        self.photo = False

    def unknown_starttag(self, name, attrs):
        dattrs = dict()
        for k,v in attrs:
            dattrs[k] = v
        if name == 'post':
            self.post = dict()
            self.post['type'] = dattrs['type']
            self.post['date'] = dattrs['date']
            self.post['format'] = dattrs['format']
            self.post['tags'] = list()
            try:
                private = dattrs['private']
                if private == 'true':
                    self.post['private'] = 1
            except:
                pass
        elif self.post is not None:
            self.curtag = name
            if name == 'photo-url':
                maxwidth = dattrs['max-width']
                if maxwidth == '500':
                    self.photo = True
                else:
                    self.photo = False

    def unknown_endtag(self, name):
        if name == 'post':
            self.post['tags'] = ', '.join(self.post['tags'])
            self.callback(self.post)
            self.post = None
            self.curtag = None

    def handle_data(self, content):
        if self.curtag == 'tag':
            self.post['tags'].append(content)
        elif self.curtag == 'photo-url' and self.photo:
            self.post['source'] = content
        elif tag_map.has_key(self.curtag):
            tag = tag_map[self.curtag]
            try:
                self.post[tag] += content
            except:
                self.post[tag] = content

class SimpleContentHandler(SGMLParser):
    def __init__(self, fd):
        SGMLParser.__init__(self)
        self.fd = fd
        self.postCount = 0
        self.ok = False

    def start_post(self, attrs):
        sattrs = ' '.join(['%s="%s"' % (k, v) for k, v in attrs])
        if sattrs: sattrs = ' ' + sattrs
        self.ok = True
        self.post = '<post%s>' % (sattrs)

    def unknown_starttag(self, name, attrs):
        sattrs = ' '.join(['%s="%s"' % (k, v) for k, v in attrs])
        if sattrs: sattrs = ' ' + sattrs
        if self.ok:
            self.post += '<%s%s>' % (name, sattrs)

    def end_post(self):
        self.post += '</post>'
        self.ok = False
        self.fd.write(self.post + '\n')
        self.postCount += 1

    def unknown_endtag(self, name):
        if self.ok:
            self.post += '</%s>' % (name)

    def handle_data(self, content):
        if self.ok:
            self.post += content

    def getPostCount(self):
        return self.postCount

    def resetPostCount(self):
        self.postCount = 0

class IdList(SGMLParser):
    def __init__(self):
        SGMLParser.__init__(self)
        self.data = list()
        self.postCount = 0

    def start_post(self, attrs):
        self.postCount += 1
        for k, v in attrs:
            if k == 'id':
                self.data.append(v)
                break

    def getPostCount(self):
        return self.postCount

    def resetPostCount(self):
        self.postCount = 0

    def getIdList(self):
        return self.data

def ExportBlog(email, password, name, fname):
    api = TumblrAPI(email, password)
    fd = open(fname, 'w')
    fd.write('<?xml version="1.0" encoding="UTF-8"?>\n')
    fd.write('<posts>\n')
    print 'Fetching posts from', name
    count = 0
    parser = SimpleContentHandler(fd)
    while True:
        num = 50
        data = api.auth_read(name, start=count, num=num).strip()
#        data = api.read(name, start=count, num=num).strip()
        parser.feed(data)
        sys.stderr.write('.')
        if parser.getPostCount() == 0:
            break
        count += parser.getPostCount()
        parser.resetPostCount()
    print '\nGot', count, 'posts'
    fd.write('</posts>\n')
    fd.close()

def ImportBlog(email, password, name, fname):
    api = TumblrAPI(email, password)
    postList = list()
    def appendPost(post):
        postList.append(post)
    print 'Reading posts from', fname
    parser = TumblrContentHandler(appendPost)
    parser.feed(open(fname).read())
    postList.reverse()
    print 'Importing posts to', name
    count = 0
    for post in postList:
        post['group'] = name
#         print post
#         print '----'
        api.write(post)
        count += 1
        if count % 50 == 0:
            sys.stderr.write('.')
    print '\nDone'

def Auth(email, password):
    api = TumblrAPI(email, password)
    print api.authenticate()

def Clean(email, password, name):
    api = TumblrAPI(email, password)
    print 'Fetching posts from', name
    count = 0
    parser = IdList()
    while True:
        num = 50
        data = api.auth_read(name, start=count, num=num).strip()
        parser.feed(data)
        sys.stderr.write('.')
        if parser.getPostCount() == 0:
            break
        count += parser.getPostCount()
        parser.resetPostCount()
    print '\nGot', count, 'posts'
    print 'Deleting', count, 'posts from', name
    for item in parser.getIdList():
        api.delete(item)
    print 'Done.'

if __name__ == '__main__':
    if len(sys.argv) < 6:
        sys.stderr.write(__doc__)
        sys.exit(1)
    cmd, email, password, blogName, fname = sys.argv[1:]
    if cmd == 'export':
        ExportBlog(email, password, blogName, fname)
    elif cmd == 'import':
        ImportBlog(email, password, blogName, fname)
    elif cmd == 'auth':
        Auth(email, password)
    elif cmd == 'clean':
        Clean(email, password, blogName)
    else:
        sys.stderr.write('Unknown command "%s"\n' % (cmd))

  1. altergrin reblogged this from dmych
  2. russian reblogged this from dmych
  3. dmych posted this