| |
| |
| |
| |
| """Test everything about packs reading and writing""" |
| from gitdb.test.lib import ( |
| TestBase, |
| with_rw_directory, |
| fixture_path |
| ) |
|
|
| from gitdb.stream import DeltaApplyReader |
|
|
| from gitdb.pack import ( |
| PackEntity, |
| PackIndexFile, |
| PackFile |
| ) |
|
|
| from gitdb.base import ( |
| OInfo, |
| OStream, |
| ) |
|
|
| from gitdb.fun import delta_types |
| from gitdb.exc import UnsupportedOperation |
| from gitdb.util import to_bin_sha |
|
|
| import pytest |
|
|
| import os |
| import tempfile |
|
|
|
|
| |
| def bin_sha_from_filename(filename): |
| return to_bin_sha(os.path.splitext(os.path.basename(filename))[0][5:]) |
| |
|
|
|
|
| class TestPack(TestBase): |
|
|
| packindexfile_v1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.idx'), 1, 67) |
| packindexfile_v2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.idx'), 2, 30) |
| packindexfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.idx'), 2, 42) |
| packfile_v2_1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.pack'), 2, packindexfile_v1[2]) |
| packfile_v2_2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack'), 2, packindexfile_v2[2]) |
| packfile_v2_3_ascii = ( |
| fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.pack'), 2, packindexfile_v2_3_ascii[2]) |
|
|
| def _assert_index_file(self, index, version, size): |
| assert index.packfile_checksum() != index.indexfile_checksum() |
| assert len(index.packfile_checksum()) == 20 |
| assert len(index.indexfile_checksum()) == 20 |
| assert index.version() == version |
| assert index.size() == size |
| assert len(index.offsets()) == size |
|
|
| |
| for oidx in range(index.size()): |
| sha = index.sha(oidx) |
| assert oidx == index.sha_to_index(sha) |
|
|
| entry = index.entry(oidx) |
| assert len(entry) == 3 |
|
|
| assert entry[0] == index.offset(oidx) |
| assert entry[1] == sha |
| assert entry[2] == index.crc(oidx) |
|
|
| |
| for l in (4, 8, 11, 17, 20): |
| assert index.partial_sha_to_index(sha[:l], l * 2) == oidx |
|
|
| |
| self.assertRaises(ValueError, index.partial_sha_to_index, "\0", 2) |
|
|
| def _assert_pack_file(self, pack, version, size): |
| assert pack.version() == 2 |
| assert pack.size() == size |
| assert len(pack.checksum()) == 20 |
|
|
| num_obj = 0 |
| for obj in pack.stream_iter(): |
| num_obj += 1 |
| info = pack.info(obj.pack_offset) |
| stream = pack.stream(obj.pack_offset) |
|
|
| assert info.pack_offset == stream.pack_offset |
| assert info.type_id == stream.type_id |
| assert hasattr(stream, 'read') |
|
|
| |
| assert obj.read() == stream.read() |
|
|
| streams = pack.collect_streams(obj.pack_offset) |
| assert streams |
|
|
| |
| try: |
| dstream = DeltaApplyReader.new(streams) |
| except ValueError: |
| |
| |
| |
| continue |
| |
|
|
| |
| data = dstream.read() |
| assert len(data) == dstream.size |
|
|
| |
| dstream.seek(0) |
| assert dstream.read() == data |
|
|
| |
| |
| |
|
|
| |
| assert num_obj == size |
|
|
| def test_pack_index(self): |
| |
| for indexfile, version, size in (self.packindexfile_v1, self.packindexfile_v2): |
| index = PackIndexFile(indexfile) |
| self._assert_index_file(index, version, size) |
| |
|
|
| def test_pack(self): |
| |
| for packfile, version, size in (self.packfile_v2_3_ascii, self.packfile_v2_1, self.packfile_v2_2): |
| pack = PackFile(packfile) |
| self._assert_pack_file(pack, version, size) |
| |
|
|
| @with_rw_directory |
| def test_pack_entity(self, rw_dir): |
| pack_objs = list() |
| for packinfo, indexinfo in ((self.packfile_v2_1, self.packindexfile_v1), |
| (self.packfile_v2_2, self.packindexfile_v2), |
| (self.packfile_v2_3_ascii, self.packindexfile_v2_3_ascii)): |
| packfile, version, size = packinfo |
| indexfile, version, size = indexinfo |
| entity = PackEntity(packfile) |
| assert entity.pack().path() == packfile |
| assert entity.index().path() == indexfile |
| pack_objs.extend(entity.stream_iter()) |
|
|
| count = 0 |
| for info, stream in zip(entity.info_iter(), entity.stream_iter()): |
| count += 1 |
| assert info.binsha == stream.binsha |
| assert len(info.binsha) == 20 |
| assert info.type_id == stream.type_id |
| assert info.size == stream.size |
|
|
| |
| assert not info.type_id in delta_types |
|
|
| |
| assert len(entity.collect_streams(info.binsha)) |
| oinfo = entity.info(info.binsha) |
| assert isinstance(oinfo, OInfo) |
| assert oinfo.binsha is not None |
| ostream = entity.stream(info.binsha) |
| assert isinstance(ostream, OStream) |
| assert ostream.binsha is not None |
|
|
| |
| try: |
| assert entity.is_valid_stream(info.binsha, use_crc=True) |
| except UnsupportedOperation: |
| pass |
| |
| assert entity.is_valid_stream(info.binsha, use_crc=False) |
| |
| assert count == size |
|
|
| |
|
|
| |
| |
| pack_path1 = tempfile.mktemp('', "pack1", rw_dir) |
| pack_path2 = tempfile.mktemp('', "pack2", rw_dir) |
| index_path = tempfile.mktemp('', 'index', rw_dir) |
| iteration = 0 |
|
|
| def rewind_streams(): |
| for obj in pack_objs: |
| obj.stream.seek(0) |
| |
| for ppath, ipath, num_obj in zip((pack_path1, pack_path2), |
| (index_path, None), |
| (len(pack_objs), None)): |
| iwrite = None |
| if ipath: |
| ifile = open(ipath, 'wb') |
| iwrite = ifile.write |
| |
|
|
| |
| if iteration > 0: |
| rewind_streams() |
| |
| iteration += 1 |
|
|
| with open(ppath, 'wb') as pfile: |
| pack_sha, index_sha = PackEntity.write_pack(pack_objs, pfile.write, iwrite, object_count=num_obj) |
| assert os.path.getsize(ppath) > 100 |
|
|
| |
| pf = PackFile(ppath) |
| assert pf.size() == len(pack_objs) |
| assert pf.version() == PackFile.pack_version_default |
| assert pf.checksum() == pack_sha |
| pf.close() |
|
|
| |
| if ipath is not None: |
| ifile.close() |
| assert os.path.getsize(ipath) > 100 |
| idx = PackIndexFile(ipath) |
| assert idx.version() == PackIndexFile.index_version_default |
| assert idx.packfile_checksum() == pack_sha |
| assert idx.indexfile_checksum() == index_sha |
| assert idx.size() == len(pack_objs) |
| idx.close() |
| |
| |
|
|
| |
| rewind_streams() |
| entity = PackEntity.create(pack_objs, rw_dir) |
| count = 0 |
| for info in entity.info_iter(): |
| count += 1 |
| for use_crc in range(2): |
| assert entity.is_valid_stream(info.binsha, use_crc) |
| |
| |
| assert count == len(pack_objs) |
| entity.close() |
|
|
| def test_pack_64(self): |
| |
| |
| pytest.skip('not implemented') |
|
|