# -*- coding: utf-8 -*- # MIT License (see LICENSE.txt or https://opensource.org/licenses/MIT) """Implements a class with methods related to the Chrome OS image""" from __future__ import absolute_import, division, unicode_literals import os from struct import calcsize, unpack from zipfile import ZipFile from io import UnsupportedOperation from ..kodiutils import exists, localize, log, mkdirs from .. import config from ..unicodes import compat_path class ChromeOSImage: """ The main class handling a Chrome OS image Information related to ext2 is sourced from here: https://www.nongnu.org/ext2-doc/ext2.html """ def __init__(self, imgpath): """Prepares the image""" self.imgpath = imgpath self.bstream = self.get_bstream(imgpath) self.part_offset = None self.sb_dict = None self.blocksize = None self.blk_groups = None self.progress = None def gpt_header(self): """Returns the needed parts of the GPT header, can be easily expanded if necessary""" header_fmt = '<8s4sII4x4Q16sQ3I' header_size = calcsize(header_fmt) lba_size = config.CHROMEOS_BLOCK_SIZE # assuming LBA size self.seek_stream(lba_size) # GPT Header entries: signature, revision, header_size, header_crc32, (reserved 4x skipped,) current_lba, backup_lba, # first_usable_lba, last_usable_lba, disk_guid, start_lba_part_entries, num_part_entries, # size_part_entry, crc32_part_entries _, _, _, _, _, _, _, _, _, start_lba_part_entries, num_part_entries, size_part_entry, _ = unpack(header_fmt, self.read_stream(header_size)) return (start_lba_part_entries, num_part_entries, size_part_entry) def chromeos_offset(self): """Calculate the Chrome OS losetup start offset""" part_format = '<16s16sQQQ72s' entries_start, entries_num, entry_size = self.gpt_header() # assuming partition table is GPT lba_size = config.CHROMEOS_BLOCK_SIZE # assuming LBA size self.seek_stream(entries_start * lba_size) if not calcsize(part_format) == entry_size: log(4, 'Partition table entries are not 128 bytes long') return 0 for index in range(1, entries_num + 1): # pylint: disable=unused-variable # Entry: type_guid, unique_guid, first_lba, last_lba, attr_flags, part_name _, _, first_lba, _, _, part_name = unpack(part_format, self.read_stream(entry_size)) part_name = part_name.decode('utf-16').strip('\x00') if part_name == 'ROOT-A': # assuming partition name is ROOT-A offset = first_lba * lba_size break if not offset: log(4, 'Failed to calculate losetup offset.') return 0 return offset def extract_file(self, filename, extract_path, progress): """Extracts the file from the image""" self.progress = progress self.progress.update(2, localize(30060)) self.part_offset = self.chromeos_offset() self.sb_dict = self.superblock() self.blk_groups = self.block_groups() bin_filename = filename.encode('ascii') chunksize = 4 * 1024**2 percent8 = 40 self.progress.update(int(percent8 / 8), localize(30061)) chunk1 = self.read_stream(chunksize) while True: chunk2 = self.read_stream(chunksize) if not chunk2: log(4, 'File {filename} not found in the ChromeOS image', filename=filename) return False chunk = chunk1 + chunk2 if bin_filename in chunk: i_index_pos = chunk.index(bin_filename) - 8 dir_dict = self.dir_entry(chunk[i_index_pos:i_index_pos + len(filename) + 8]) if dir_dict['inode'] < self.sb_dict['s_inodes_count'] and dir_dict['name_len'] == len(filename): break chunk1 = chunk2 if percent8 < 240: percent8 += 1 self.progress.update(int(percent8 / 8)) self.progress.update(32, localize(30062)) blk_group_num = (dir_dict['inode'] - 1) // self.sb_dict['s_inodes_per_group'] blk_group = self.blk_groups[blk_group_num] i_index_in_group = (dir_dict['inode'] - 1) % self.sb_dict['s_inodes_per_group'] inode_pos = self.part_offset + self.blocksize * blk_group['bg_inode_table'] + self.sb_dict['s_inode_size'] * i_index_in_group inode_dict, _ = self.inode_table(inode_pos) return self.write_file(inode_dict, os.path.join(extract_path, filename)) def superblock(self): """Get relevant info from the superblock, assert it's an ext2 fs""" names = ('s_inodes_count', 's_blocks_count', 's_r_blocks_count', 's_free_blocks_count', 's_free_inodes_count', 's_first_data_block', 's_log_block_size', 's_log_frag_size', 's_blocks_per_group', 's_frags_per_group', 's_inodes_per_group', 's_mtime', 's_wtime', 's_mnt_count', 's_max_mnt_count', 's_magic', 's_state', 's_errors', 's_minor_rev_level', 's_lastcheck', 's_checkinterval', 's_creator_os', 's_rev_level', 's_def_resuid', 's_def_resgid', 's_first_ino', 's_inode_size', 's_block_group_nr', 's_feature_compat', 's_feature_incompat', 's_feature_ro_compat', 's_uuid', 's_volume_name', 's_last_mounted', 's_algorithm_usage_bitmap', 's_prealloc_block', 's_prealloc_dir_blocks') fmt = '<13I6H4I2HI2H3I16s16s64sI2B818x' fmt_len = calcsize(fmt) self.seek_stream(self.part_offset + 1024) # superblock starts after 1024 byte pack = self.read_stream(fmt_len) sb_dict = dict(list(zip(names, unpack(fmt, pack)))) sb_dict['s_magic'] = hex(sb_dict['s_magic']) assert sb_dict['s_magic'] == '0xef53' # assuming/checking this is an ext2 fs block_groups_count1 = sb_dict['s_blocks_count'] / sb_dict['s_blocks_per_group'] block_groups_count1 = int(block_groups_count1) if float(int(block_groups_count1)) == block_groups_count1 else int(block_groups_count1) + 1 block_groups_count2 = sb_dict['s_inodes_count'] / sb_dict['s_inodes_per_group'] block_groups_count2 = int(block_groups_count2) if float(int(block_groups_count2)) == block_groups_count2 else int(block_groups_count2) + 1 assert block_groups_count1 == block_groups_count2 sb_dict['block_groups_count'] = block_groups_count1 self.blocksize = 1024 << sb_dict['s_log_block_size'] return sb_dict def block_group(self): """Get info about a block group""" names = ('bg_block_bitmap', 'bg_inode_bitmap', 'bg_inode_table', 'bg_free_blocks_count', 'bg_free_inodes_count', 'bg_used_dirs_count', 'bg_pad') fmt = '<3I4H12x' fmt_len = calcsize(fmt) pack = self.read_stream(fmt_len) blk = unpack(fmt, pack) blk_dict = dict(list(zip(names, blk))) return blk_dict def block_groups(self): """Get info about all block groups""" if self.blocksize == 1024: self.seek_stream(self.part_offset + 2 * self.blocksize) else: self.seek_stream(self.part_offset + self.blocksize) blk_groups = [] for i in range(self.sb_dict['block_groups_count']): # pylint: disable=unused-variable blk_group = self.block_group() blk_groups.append(blk_group) return blk_groups def inode_table(self, inode_pos): """Reads and returns an inode table and inode size""" names = ('i_mode', 'i_uid', 'i_size', 'i_atime', 'i_ctime', 'i_mtime', 'i_dtime', 'i_gid', 'i_links_count', 'i_blocks', 'i_flags', 'i_osd1', 'i_block0', 'i_block1', 'i_block2', 'i_block3', 'i_block4', 'i_block5', 'i_block6', 'i_block7', 'i_block8', 'i_block9', 'i_block10', 'i_block11', 'i_blocki', 'i_blockii', 'i_blockiii', 'i_generation', 'i_file_acl', 'i_dir_acl', 'i_faddr') fmt = '<2Hi4I2H3I15I4I12x' fmt_len = calcsize(fmt) inode_size = self.sb_dict['s_inode_size'] self.seek_stream(inode_pos) pack = self.read_stream(fmt_len) inode = unpack(fmt, pack) inode_dict = dict(list(zip(names, inode))) inode_dict['i_mode'] = hex(inode_dict['i_mode']) blocks = inode_dict['i_size'] / self.blocksize inode_dict['blocks'] = int(blocks) if float(int(blocks)) == blocks else int(blocks) + 1 self.read_stream(inode_size - fmt_len) return inode_dict, inode_size @staticmethod def dir_entry(chunk): """Returns the directory entry found in chunk""" dir_names = ('inode', 'rec_len', 'name_len', 'file_type', 'name') dir_fmt = '= self.bstream[1]: while seek_pos - self.bstream[1] > chunksize: self.read_stream(chunksize) self.read_stream(seek_pos - self.bstream[1]) return self.bstream[0].close() self.bstream[1] = 0 self.bstream = self.get_bstream(self.imgpath) while seek_pos - self.bstream[1] > chunksize: self.read_stream(chunksize) self.read_stream(seek_pos - self.bstream[1]) return def read_stream(self, num_of_bytes): """Read and return a chunk of the bytestream""" self.bstream[1] += num_of_bytes return self.bstream[0].read(num_of_bytes) def get_block_ids(self, inode_dict): """Get all block indices/IDs of an inode""" ids_to_read = inode_dict['blocks'] block_ids = [inode_dict['i_block' + str(i)] for i in range(12)] ids_to_read -= 12 if not inode_dict['i_blocki'] == 0: iblocks, ids_to_read = self.iblock_ids(inode_dict['i_blocki'], ids_to_read) block_ids += iblocks if not inode_dict['i_blockii'] == 0: iiblocks, ids_to_read = self.iiblock_ids(inode_dict['i_blockii'], ids_to_read) block_ids += iiblocks return block_ids[:inode_dict['blocks']] def read_file(self, block_ids): """Read blocks specified by IDs into a dict""" block_dict = {} for block_id in block_ids: percent = int(35 + 60 * block_ids.index(block_id) / len(block_ids)) self.progress.update(percent, localize(30048)) seek_pos = self.part_offset + self.blocksize * block_id self.seek_stream(seek_pos) block_dict[block_id] = self.read_stream(self.blocksize) return block_dict @staticmethod def write_file_chunk(opened_file, chunk, bytes_to_write): """Writes bytes to file in chunks""" if len(chunk) > bytes_to_write: opened_file.write(chunk[:bytes_to_write]) return 0 opened_file.write(chunk) return bytes_to_write - len(chunk) def write_file(self, inode_dict, filepath): """Writes file specified by its inode to filepath""" bytes_to_write = inode_dict['i_size'] block_ids = self.get_block_ids(inode_dict) block_ids_sorted = block_ids[:] block_ids_sorted.sort() block_dict = self.read_file(block_ids_sorted) write_dir = os.path.join(os.path.dirname(filepath), '') if not exists(write_dir): mkdirs(write_dir) with open(compat_path(filepath), 'wb') as opened_file: for block_id in block_ids: bytes_to_write = self.write_file_chunk(opened_file, block_dict[block_id], bytes_to_write) if bytes_to_write == 0: return True return False @staticmethod def get_bstream(imgpath): """Get a bytestream of the image""" if imgpath.endswith('.zip'): bstream = ZipFile(compat_path(imgpath), 'r').open(os.path.basename(imgpath).strip('.zip'), 'r') else: bstream = open(compat_path(imgpath), 'rb') return [bstream, 0]