add checker for mobi

pull/1/head
eric 2014-02-05 18:17:26 -05:00
parent d03e8d5097
commit f8df2507ee
4 changed files with 398 additions and 2 deletions

View File

@ -61,6 +61,7 @@ from regluit.core.lookups import (
)
from regluit.utils.localdatetime import now
from regluit.utils.fields import EpubFileField
from regluit.mobi import Mobi
logger = logging.getLogger(__name__)
@ -170,8 +171,11 @@ class EbookFileForm(forms.ModelForm):
if not zipfile.is_zipfile(the_file.file):
raise forms.ValidationError(_('%s is not a valid EPUB file' % the_file.name) )
elif format == 'mobi':
if not zipfile.is_zipfile(the_file.file):
raise forms.ValidationError(_('%s is not a valid MOBI file' % the_file.name) )
try:
book = Mobi(the_file.file);
book.parse();
except Exception as e:
raise forms.ValidationError(_('Are you sure this is a MOBI file?: %s' % e) )
elif format == 'pdf':
try:
doc = PdfFileReader(the_file.file)

286
mobi/__init__.py Normal file
View File

@ -0,0 +1,286 @@
#!/usr/bin/env python
# encoding: utf-8
"""
Mobi.py
Created by Elliot Kroo on 2009-12-25.
Copyright (c) 2009 Elliot Kroo. All rights reserved.
"""
import sys
import os
import unittest
from struct import *
from pprint import pprint
import utils
from lz77 import uncompress_lz77
class Mobi:
def parse(self):
""" reads in the file, then parses record tables"""
self.contents = self.f.read();
self.header = self.parseHeader();
self.records = self.parseRecordInfoList();
self.readRecord0()
def readRecord(self, recordnum, disable_compression=False):
if self.config:
if self.config['palmdoc']['Compression'] == 1 or disable_compression:
return self.contents[self.records[recordnum]['record Data Offset']:self.records[recordnum+1]['record Data Offset']];
elif self.config['palmdoc']['Compression'] == 2:
result = uncompress_lz77(self.contents[self.records[recordnum]['record Data Offset']:self.records[recordnum+1]['record Data Offset']-self.config['mobi']['extra bytes']])
return result
def readImageRecord(self, imgnum):
if self.config:
recordnum = self.config['mobi']['First Image index'] + imgnum;
return self.readRecord(recordnum, disable_compression=True);
def author(self):
"Returns the author of the book"
return self.config['exth']['records'][100]
def title(self):
"Returns the title of the book"
return self.config['mobi']['Full Name']
########### Private API ###########################
def __init__(self, filename):
try:
if isinstance(filename, str):
self.f = open(filename, "rb");
else:
self.f = filename;
except IOError,e:
sys.stderr.write("Could not open %s! " % filename);
raise e;
self.offset = 0;
def __iter__(self):
if not self.config: return;
for record in range(1, self.config['mobi']['First Non-book index'] - 1):
yield self.readRecord(record);
def parseRecordInfoList(self):
records = {};
# read in all records in info list
for recordID in range(self.header['number of records']):
headerfmt = '>II'
headerlen = calcsize(headerfmt)
fields = [
"record Data Offset",
"UniqueID",
]
# create tuple with info
results = zip(fields, unpack(headerfmt, self.contents[self.offset:self.offset+headerlen]))
# increment offset into file
self.offset += headerlen
# convert tuple to dictionary
resultsDict = utils.toDict(results);
# futz around with the unique ID record, as the uniqueID's top 8 bytes are
# really the "record attributes":
resultsDict['record Attributes'] = (resultsDict['UniqueID'] & 0xFF000000) >> 24;
resultsDict['UniqueID'] = resultsDict['UniqueID'] & 0x00FFFFFF;
# store into the records dict
records[resultsDict['UniqueID']] = resultsDict;
return records;
def parseHeader(self):
headerfmt = '>32shhIIIIII4s4sIIH'
headerlen = calcsize(headerfmt)
fields = [
"name",
"attributes",
"version",
"created",
"modified",
"backup",
"modnum",
"appInfoId",
"sortInfoID",
"type",
"creator",
"uniqueIDseed",
"nextRecordListID",
"number of records"
]
# unpack header, zip up into list of tuples
results = zip(fields, unpack(headerfmt, self.contents[self.offset:self.offset+headerlen]))
# increment offset into file
self.offset += headerlen
# convert tuple array to dictionary
resultsDict = utils.toDict(results);
return resultsDict
def readRecord0(self):
palmdocHeader = self.parsePalmDOCHeader();
MobiHeader = self.parseMobiHeader();
exthHeader = None
if MobiHeader['Has EXTH Header']:
exthHeader = self.parseEXTHHeader();
self.config = {
'palmdoc': palmdocHeader,
'mobi' : MobiHeader,
'exth' : exthHeader
}
def parseEXTHHeader(self):
headerfmt = '>III'
headerlen = calcsize(headerfmt)
fields = [
'identifier',
'header length',
'record Count'
]
# unpack header, zip up into list of tuples
results = zip(fields, unpack(headerfmt, self.contents[self.offset:self.offset+headerlen]))
# convert tuple array to dictionary
resultsDict = utils.toDict(results);
self.offset += headerlen;
resultsDict['records'] = {};
for record in range(resultsDict['record Count']):
recordType, recordLen = unpack(">II", self.contents[self.offset:self.offset+8]);
recordData = self.contents[self.offset+8:self.offset+recordLen];
resultsDict['records'][recordType] = recordData;
self.offset += recordLen;
return resultsDict;
def parseMobiHeader(self):
headerfmt = '> IIII II 40s III IIIII IIII I 36s IIII 8s HHIIIII'
headerlen = calcsize(headerfmt)
fields = [
"identifier",
"header length",
"Mobi type",
"text Encoding",
"Unique-ID",
"Generator version",
"-Reserved",
"First Non-book index",
"Full Name Offset",
"Full Name Length",
"Language",
"Input Language",
"Output Language",
"Format version",
"First Image index",
"First Huff Record",
"Huff Record Count",
"First DATP Record",
"DATP Record Count",
"EXTH flags",
"-36 unknown bytes, if Mobi is long enough",
"DRM Offset",
"DRM Count",
"DRM Size",
"DRM Flags",
"-Usually Zeros, unknown 8 bytes",
"-Unknown",
"Last Image Record",
"-Unknown",
"FCIS record",
"-Unknown",
"FLIS record",
"Unknown"
]
# unpack header, zip up into list of tuples
results = zip(fields, unpack(headerfmt, self.contents[self.offset:self.offset+headerlen]))
# convert tuple array to dictionary
resultsDict = utils.toDict(results);
resultsDict['Start Offset'] = self.offset;
resultsDict['Full Name'] = (self.contents[
self.records[0]['record Data Offset'] + resultsDict['Full Name Offset'] :
self.records[0]['record Data Offset'] + resultsDict['Full Name Offset'] + resultsDict['Full Name Length']])
resultsDict['Has DRM'] = resultsDict['DRM Offset'] != 0xFFFFFFFF;
resultsDict['Has EXTH Header'] = (resultsDict['EXTH flags'] & 0x40) != 0;
self.offset += resultsDict['header length'];
def onebits(x, width=16):
return len(filter(lambda x: x == "1", (str((x>>i)&1) for i in xrange(width-1,-1,-1))));
resultsDict['extra bytes'] = 2*onebits(unpack(">H", self.contents[self.offset-2:self.offset])[0] & 0xFFFE)
return resultsDict;
def parsePalmDOCHeader(self):
headerfmt = '>HHIHHHH'
headerlen = calcsize(headerfmt)
fields = [
"Compression",
"Unused",
"text length",
"record count",
"record size",
"Encryption Type",
"Unknown"
]
offset = self.records[0]['record Data Offset'];
# create tuple with info
results = zip(fields, unpack(headerfmt, self.contents[offset:offset+headerlen]))
# convert tuple array to dictionary
resultsDict = utils.toDict(results);
self.offset = offset+headerlen;
return resultsDict
class MobiTests(unittest.TestCase):
def setUp(self):
self.mobitest = Mobi("../test/CharlesDarwin.mobi");
def testParse(self):
self.mobitest.parse();
pprint (self.mobitest.config)
def testRead(self):
self.mobitest.parse();
content = ""
for i in range(1,5):
content += self.mobitest.readRecord(i);
def testImage(self):
self.mobitest.parse();
pprint (self.mobitest.records);
for record in range(4):
f = open("imagerecord%d.jpg" % record, 'w')
f.write(self.mobitest.readImageRecord(record));
f.close();
def testAuthorTitle(self):
self.mobitest.parse()
self.assertEqual(self.mobitest.author(), 'Charles Darwin')
self.assertEqual(self.mobitest.title(), 'The Origin of Species by means '+
'of Natural Selection, 6th Edition')
if __name__ == '__main__':
unittest.main()

86
mobi/lz77.py Normal file
View File

@ -0,0 +1,86 @@
import struct
# ported directly from the PalmDoc Perl library
# http://kobesearch.cpan.org/htdocs/EBook-Tools/EBook/Tools/PalmDoc.pm.html
def uncompress_lz77(data):
length = len(data);
offset = 0; # Current offset into data
# char; # Character being examined
# ord; # Ordinal of $char
# lz77; # 16-bit Lempel-Ziv 77 length-offset pair
# lz77offset; # LZ77 offset
# lz77length; # LZ77 length
# lz77pos; # Position inside $lz77length
text = ''; # Output (uncompressed) text
# textlength; # Length of uncompressed text during LZ77 pass
# textpos; # Position inside $text during LZ77 pass
while offset < length:
# char = substr($data,$offset++,1);
char = data[offset];
offset += 1;
ord_ = ord(char);
# print " ".join([repr(char), hex(ord_)])
# The long if-elsif chain is the best logic for $ord handling
## no critic (Cascading if-elsif chain)
if (ord_ == 0):
# Nulls are literal
text += char;
elif (ord_ <= 8):
# Next $ord bytes are literal
text += data[offset:offset+ord_] # text .=substr($data,$offset,ord);
offset += ord_;
elif (ord_ <= 0x7f):
# Values from 0x09 through 0x7f are literal
text += char;
elif (ord_ <= 0xbf):
# Data is LZ77-compressed
# From Wikipedia:
# "A length-distance pair is always encoded by a two-byte
# sequence. Of the 16 bits that make up these two bytes,
# 11 bits go to encoding the distance, 3 go to encoding
# the length, and the remaining two are used to make sure
# the decoder can identify the first byte as the beginning
# of such a two-byte sequence."
offset += 1;
if (offset > len(data)):
print("WARNING: offset to LZ77 bits is outside of the data: %d" % offset);
return text;
lz77, = struct.unpack('>H', data[offset-2:offset])
# Leftmost two bits are ID bits and need to be dropped
lz77 &= 0x3fff;
# Length is rightmost 3 bits + 3
lz77length = (lz77 & 0x0007) + 3;
# Remaining 11 bits are offset
lz77offset = lz77 >> 3;
if (lz77offset < 1):
print("WARNING: LZ77 decompression offset is invalid!");
return text;
# Getting text from the offset is a little tricky, because
# in theory you can be referring to characters you haven't
# actually decompressed yet. You therefore have to check
# the reference one character at a time.
textlength = len(text);
for lz77pos in range(lz77length): # for($lz77pos = 0; $lz77pos < $lz77length; $lz77pos++)
textpos = textlength - lz77offset;
if (textpos < 0):
print("WARNING: LZ77 decompression reference is before"+
" beginning of text! %x" % lz77);
return;
text += text[textpos:textpos+1]; #text .= substr($text,$textpos,1);
textlength+=1;
else:
# 0xc0 - 0xff are single characters (XOR 0x80) preceded by
# a space
text += ' ' + chr(ord_ ^ 0x80);
return text;

20
mobi/utils.py Normal file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env python
# encoding: utf-8
"""
utils.py
Created by Elliot Kroo on 2009-12-25.
Copyright (c) 2009 Elliot Kroo. All rights reserved.
"""
import sys
import os
import unittest
def toDict(tuples):
resultsDict = {}
for field, value in tuples:
if len(field) > 0 and field[0] != "-":
resultsDict[field] = value
return resultsDict;