#!/usr/local/bin/python """ mtdb2blosxom takes Movable Type BerkeleyDB files, and exports them into a format PyBlosxom can use. mtdb2blosxom makes some assumptions about your configuration of PyBlosxom. 1. That you are using comments.py for comments. If this is not the case, just ignore all the comment files the script spits out. 2. That you have 'linebreaks' plugin installed/avaliable """ ########## Configuration MovableTypeDBPath = '/path/to/mt/dbs' BlosxomOutputPath = '/path/to/blosxom/output' ########## Imports import sys import os import os.path import time import random import struct import re import cgi import pprint import UserDict try: import bsddb _dbModule = bsddb except ImportError: try: import bsddb185 _dbModule = bsddb185 except ImportError: raise ImportError( 'Could not import any bsddb modules' ) ########## Utilities def mkdirIfNeeded( dir, mod=0755 ): if not os.path.isdir( dir ): os.mkdir( dir, mod ) else: os.chmod( dir, mod ) os.mkdirIfNeeded = mkdirIfNeeded def textToFileName( str ): str = str.strip() str = re.sub(r"[^a-zA-Z0-9]", "_", str ) return str def MTDateToTimeTuple( date ): year = int( date[0:4] ) month = int( date[4:6] ) day = int( date[6:8] ) hour = int( date[8:10] ) min = int( date[10:12] ) sec = int( date[12:14] ) tuple = (year, month, day, hour, min, sec, 0, 0, 0) return time.mktime( tuple ) ########## Classes class MTBlog(object): def __init__( self, dict ): self.allow_comments_default = int( dict.get( 'allow_comments_default', 0 ) ) self.allow_pings_default = int( dict.get( 'allow_pings_default', 0 ) ) self.archive_path = dict.get( 'archive_path', None ) self.archive_type = dict.get( 'archive_type', None ) self.archive_type_preferred = dict.get( 'archive_type_preferred', None ) self.archive_url = dict.get( 'archive_url', None ) self.convert_paras = dict.get( 'convert_paras', None ) self.convert_paras_comments = dict.get( 'convert_paras_comments', None ) self.days_on_index = int( dict.get( 'days_on_index', 0 ) ) self.file_extension = dict.get( 'file_extension', None ) self.id = int( dict.get( 'id', 0 ) ) self.is_dynamic = dict.get( 'is_dynamic', None ) self.language = dict.get( 'language', None ) self.name = dict.get( 'name', None ) self.ping_blogs = int( dict.get( 'ping_blogs', 0 ) ) self.ping_weblogs = int( dict.get( 'ping_weblogs', 0 ) ) self.sanitize_spec = int( dict.get( 'sanitize_spec', 0 ) ) self.server_offset = int( dict.get( 'server_offset', 0 ) ) self.site_path = dict.get( 'site_path', None ) self.site_url = dict.get( 'site_url', None ) self.sort_order_comments= dict.get( 'sort_order_comments', None ) self.sort_order_posts = dict.get( 'sort_order_posts', None ) self.status_default = int( dict.get( 'status_default', 0 ) ) self.words_in_excerpt = int( dict.get( 'words_in_excerpt', 0 ) ) self.entries = [] def __repr__( self ): return pprint.pformat( {'name': self.name, 'entries': self.entries } ) def addEntry( self, entry ): assert( entry.blog_id == self.id ) self.entries.append( entry ) def convertToBlosxom( self, dataDir ): blogDir = os.path.join( dataDir, textToFileName( self.name ) ) os.mkdirIfNeeded( blogDir ) entriesDir = os.path.join( blogDir, 'entries/' ) os.mkdirIfNeeded( entriesDir ) commentsDir = os.path.join( blogDir, 'comments/' ) os.mkdirIfNeeded( commentsDir ) for entry in self.entries: entry.convertToBlosxom( entriesDir, commentsDir ) class MTAuthor(object): def __init__( self, dict ): self.can_create_blog = int( dict.get( 'can_create_blog', 0 ) ) self.can_view_log = int( dict.get( 'can_view_log', 0 ) ) self.created_by = int( dict.get( 'created_by', 0 ) ) self.email = dict.get( 'email', None ) self.hint = dict.get( 'hint', None ) self.id = int( dict.get( 'id', 0 ) ) self.name = dict.get( 'name', None ) self.nickname = dict.get( 'nickname', None ) self.password = dict.get( 'password', None ) self.preferred_language = dict.get( 'preferred_language', None ) self.url = dict.get( 'url', None ) def __repr__( self ): return self.name class MTEntry(object): def __init__( self, dict ): self.allow_comments = int( dict.get( 'allow_comments', 0 ) ) self.allow_pings = dict.get( 'allow_pings', None ) self.author_id = int( dict.get( 'author_id', 0 ) ) self.blog_id = int( dict.get( 'blog_id', 0 ) ) self.convert_breaks = dict.get( 'convert_breaks', None ) self.created_on = MTDateToTimeTuple( dict.get( 'created_on', None ) ) self.excerpt = dict.get( 'excerpt', None ) self.id = int( dict.get( 'id', 0 ) ) self.keywords = dict.get( 'keywords', None ) self.modified_on = MTDateToTimeTuple( dict.get( 'modified_on', None ) ) self.pinged_urls = dict.get( 'pinged_urls', None ) self.status = int( dict.get( 'status', 0 ) ) self.tangent_cache = dict.get( 'tangent_cache', None ) self.text = dict.get( 'text', None ) self.text_more = dict.get( 'text_more', None ) self.title = dict.get( 'title', None ) self.to_ping_urls = dict.get( 'to_ping_urls', None ) self.author = None self.placements = [] self.comments = [] def __repr__( self ): return pprint.pformat( {'title': self.title, 'author': self.author, 'comments': self.comments } ) def setAuthor( self, author ): assert( author.id == self.author_id ) self.author = author def addPlacement( self, placement ): assert( placement.blog_id == self.blog_id ) assert( placement.entry_id == self.id ) self.placements.append( placement ) def addComment( self, comment ): assert( comment.blog_id == self.blog_id ) assert( comment.entry_id == self.id ) self.comments.append( comment ) def primaryPlacement( self ): for place in self.placements: if place.is_primary: return place return Non def _writePostToBlosxomFile( self, filePath ): f = open( filePath, 'w' ) print >>f, self.title if self.convert_breaks == '0': convertBreaks = 0 elif self.convert_breaks == '0': convertBreaks = 1 else: convertBreaks = 1 #Improve me: This should load from the Blog print >>f, '#parser linebreaks' if self.author: if len( self.author.name ): print >>f, '#mt_author %s' % self.author.name categorys = map( lambda x: x.category.label, self.placements ) categorys = reduce( lambda x,y: x+', '+y, categorys ) print >>f, '#mt_categories %s' % categorys print >>f, '#mt_created_on %s' % time.strftime( '%x %X', time.localtime( self.created_on ) ) print >>f, '#mt_modified_on %s' % time.strftime( '%x %X', time.localtime( self.modified_on ) ) body = '' if self.text[0:32] != self.excerpt[0:32]: body += self.excerpt body += self.text body += self.text_more print >>f, body f.close() os.utime( filePath, (self.created_on, self.created_on) ) def _prepareBlosxomFile( self, entryDir ): placement = self.primaryPlacement() if placement: categoryDir = os.path.join( entryDir, placement.category.label ) else: categoryDir = entryDir os.mkdirIfNeeded( categoryDir ) fn = textToFileName( self.title ) if not len(fn): fn = 'Entry_%d' % self.id #Improve Me: # I don't know what all the status codes are, but 2 == published if self.status == 2: ext = '.txt' else: ext = '.txt-' fn = fn + ext filePath = os.path.join( categoryDir, fn ) return filePath def _writeBlosxomCommentFiles( self, commentsDir, entryFileName ): if not len(self.comments): return placement = self.primaryPlacement() if placement: categoryDir = os.path.join( commentsDir, placement.category.label ) else: categoryDir = entryDir os.mkdirIfNeeded( categoryDir ) for comment in self.comments: comment.convertToBlosxom( categoryDir, entryFileName ) def convertToBlosxom( self, entryDir, commentsDir ): filePath = self._prepareBlosxomFile( entryDir ) if file: self._writePostToBlosxomFile( filePath ) self._writeBlosxomCommentFiles( commentsDir, os.path.basename(filePath) ) class MTComment(object): def __init__( self, dict ): self.author = dict.get( 'author', None ) self.blog_id = int( dict.get( 'blog_id', 0 ) ) self.created_on = MTDateToTimeTuple( dict.get( 'created_on', None ) ) self.email = dict.get( 'email', None ) self.entry_id = int( dict.get( 'entry_id', 0 ) ) self.id = int( dict.get( 'id', 0 ) ) self.ip = dict.get( 'ip', None ) self.modified_on = MTDateToTimeTuple( dict.get( 'modified_on', None ) ) self.text = dict.get( 'text', None ) self.url = dict.get( 'url', None ) def __repr__( self ): return self.author def convertToBlosxom( self, commentsDir, entryFileName ): entryFileName = os.path.splitext( entryFileName )[0] filePath = os.path.join( commentsDir, '%s-%f.cmt' % (entryFileName, self.created_on) ) f = open( filePath, 'w' ) print >>f, '' print >>f, '' print >>f, '\t%s' % '' #This should be title print >>f, '\t%s' % cgi.escape( self.author ) print >>f, '\t%s' % cgi.escape( self.url ) print >>f, '\t%s' % '' print >>f, '\t%s' % str( self.created_on ) print >>f, '\t%s' % cgi.escape( self.text ) print >>f, '' print >>f, '' % cgi.escape( self.email ) print >>f, '' % cgi.escape( self.ip ) f.close() os.utime( filePath, (self.created_on, self.created_on) ) class MTCategory( object ): def __init__( self, dict ): self.author_id = int( dict.get( 'author_id', 0 ) ) self.blog_id = int( dict.get( 'blog_id', 0 ) ) self.id = int( dict.get( 'id', 0 ) ) self.label = dict.get( 'label' ) def __repr__( self ): return self.label class MTPlacement( object ): def __init__( self, dict ): self.blog_id = int( dict.get( 'blog_id', 0 ) ) self.category_id = int( dict.get( 'category_id' ) ) self.entry_id = int( dict.get( 'entry_id' ) ) self.id = int( dict.get( 'id', 0 ) ) self.is_primary = int( dict.get( 'is_primary', 0 ) ) self.category = None def setCategory( self, cat ): assert( cat.id == self.category_id ) self.category = cat class MTDatabase( UserDict.DictMixin ): def __init__( self, filePath ): try: if filePath.endswith( '.pickle' ): import cPickle self._db = cPickle.load( file( filePath, 'r' ) ) self._thawCache = self._db else: self._db = _dbModule.btopen( filePath, 'r' ) self._thawCache = {} except _dbModule.error: raise IOError( 'Could not read MT database at "%s"' % filePath ) def toPickleFile( self, outPath ): import cPickle cPickle.dump( dict(self), file( outPath, 'w' ) ) def keys( self ): return self._db.keys() def __getitem__(self, key): if key in self._thawCache: return self._thawCache[key] return self._thawCache.setdefault( key, self._thaw( self._db[key] ) ) def _thaw( self, value ): result = None if type(value) == str: #Check magic if value[0:4] != 'SERG': raise RuntimeError( 'Magic value no good' ) dict = {} value = value[4:] length = len(value) pos = 0 while pos < length: #Read key length slen = struct.unpack( '!L', value[pos:pos+4] )[0] pos += 4 #Read key if slen: col = value[pos:pos+slen] else: col = '' pos += slen #Read value length slen = struct.unpack( '!L', value[pos:pos+4] )[0] pos += 4 #Read value col_val = value[pos:pos+slen] pos += slen #Store it dict[col] = col_val result = dict elif type(value) == list: result = map( self._thaw, value ) else: raise TypeError( 'Unknown type of frozen object: %s' % str(type(value)) ) return result def __setitem__(self, key, item): raise NotImplementedError def __delitem__(self, key): raise NotImplementedError class MovableType( object ): def __init__( self, dbDir, pickled=False ): dbDir = os.path.expanduser( dbDir ) self._load( dbDir, pickled ) def convertToBlosxom( self, dataDir ): dataDir = os.path.expanduser( dataDir ) self._store( dataDir ) def _load( self, dbDir, pickled ): if pickled: ext = '.db.pickle' else: ext = '.db' #Get our DBs authorDB = MTDatabase( os.path.join( dbDir, 'author'+ext ) ) blogDB = MTDatabase( os.path.join( dbDir, 'blog'+ext ) ) entryDB = MTDatabase( os.path.join( dbDir, 'entry'+ext ) ) commentDB = MTDatabase( os.path.join( dbDir, 'comment'+ext ) ) categoryDB = MTDatabase( os.path.join( dbDir, 'category'+ext ) ) placementDB = MTDatabase( os.path.join( dbDir, 'placement'+ext ) ) #Convert them all to objects allBlogs = map( lambda x: MTBlog(x), blogDB.values() ) allEntries = map( lambda x: MTEntry(x), entryDB.values() ) allAuthors = map( lambda x: MTAuthor(x), authorDB.values() ) allComments = map( lambda x: MTComment(x), commentDB.values() ) allCategories = map( lambda x: MTCategory(x), categoryDB.values() ) allPlacements = map( lambda x: MTPlacement(x), placementDB.values() ) #Build the object hierarchy from the relations (take -that- Relational model!) #We work bottom up here #Placements: Set the category of a placement for placement in allPlacements: for category in allCategories: if placement.category_id == category.id: placement.setCategory( category ) #Entries: Set the author, comments and placements for entry in allEntries: for author in allAuthors: if author.id == entry.author_id: entry.setAuthor( author ) for comment in allComments: if comment.blog_id == entry.blog_id: if comment.entry_id == entry.id: entry.addComment( comment ) for placement in allPlacements: if placement.blog_id == entry.blog_id: if placement.entry_id == entry.id: entry.addPlacement( placement ) #Blogs: Set the entries for blog in allBlogs: for entry in allEntries: if entry.blog_id == blog.id: blog.addEntry( entry ) #Set the root self.blogs = allBlogs def _store( self, dataDir ): os.mkdirIfNeeded( dataDir ) for blog in self.blogs: blog.convertToBlosxom( dataDir ) def __repr__( self ): return pprint.pformat( self.blogs ) ########## Main def main(): global MovableTypeDBPath global BlosxomOutputPath mt = MovableType( MovableTypeDBPath ) mt.convertToBlosxom( BlosxomOutputPath ) if __name__ == '__main__': main()