|
20 | 20 |
|
21 | 21 | """Tests for dulwich.graph."""
|
22 | 22 |
|
23 |
| -from dulwich.graph import WorkList, _find_lcas, can_fast_forward |
| 23 | +from dulwich.graph import ( |
| 24 | + WorkList, |
| 25 | + _find_lcas, |
| 26 | + can_fast_forward, |
| 27 | + find_merge_base, |
| 28 | + find_octopus_base, |
| 29 | +) |
24 | 30 | from dulwich.repo import MemoryRepo
|
25 | 31 | from dulwich.tests.utils import make_commit
|
26 | 32 |
|
@@ -168,6 +174,84 @@ def lookup_stamp(commit_id) -> int:
|
168 | 174 | self.assertEqual(set(lcas), {"2"})
|
169 | 175 |
|
170 | 176 |
|
| 177 | +class FindMergeBaseFunctionTests(TestCase): |
| 178 | + def test_find_merge_base_empty(self) -> None: |
| 179 | + r = MemoryRepo() |
| 180 | + # Empty list of commits |
| 181 | + self.assertEqual([], find_merge_base(r, [])) |
| 182 | + |
| 183 | + def test_find_merge_base_single(self) -> None: |
| 184 | + r = MemoryRepo() |
| 185 | + base = make_commit() |
| 186 | + r.object_store.add_objects([(base, None)]) |
| 187 | + # Single commit returns itself |
| 188 | + self.assertEqual([base.id], find_merge_base(r, [base.id])) |
| 189 | + |
| 190 | + def test_find_merge_base_identical(self) -> None: |
| 191 | + r = MemoryRepo() |
| 192 | + base = make_commit() |
| 193 | + r.object_store.add_objects([(base, None)]) |
| 194 | + # When the same commit is in both positions |
| 195 | + self.assertEqual([base.id], find_merge_base(r, [base.id, base.id])) |
| 196 | + |
| 197 | + def test_find_merge_base_linear(self) -> None: |
| 198 | + r = MemoryRepo() |
| 199 | + base = make_commit() |
| 200 | + c1 = make_commit(parents=[base.id]) |
| 201 | + c2 = make_commit(parents=[c1.id]) |
| 202 | + r.object_store.add_objects([(base, None), (c1, None), (c2, None)]) |
| 203 | + # Base of c1 and c2 is c1 |
| 204 | + self.assertEqual([c1.id], find_merge_base(r, [c1.id, c2.id])) |
| 205 | + # Base of c2 and c1 is c1 |
| 206 | + self.assertEqual([c1.id], find_merge_base(r, [c2.id, c1.id])) |
| 207 | + |
| 208 | + def test_find_merge_base_diverged(self) -> None: |
| 209 | + r = MemoryRepo() |
| 210 | + base = make_commit() |
| 211 | + c1 = make_commit(parents=[base.id]) |
| 212 | + c2a = make_commit(parents=[c1.id], message=b"2a") |
| 213 | + c2b = make_commit(parents=[c1.id], message=b"2b") |
| 214 | + r.object_store.add_objects([(base, None), (c1, None), (c2a, None), (c2b, None)]) |
| 215 | + # Merge base of two diverged commits is their common parent |
| 216 | + self.assertEqual([c1.id], find_merge_base(r, [c2a.id, c2b.id])) |
| 217 | + |
| 218 | + |
| 219 | +class FindOctopusBaseTests(TestCase): |
| 220 | + def test_find_octopus_base_empty(self) -> None: |
| 221 | + r = MemoryRepo() |
| 222 | + # Empty list of commits |
| 223 | + self.assertEqual([], find_octopus_base(r, [])) |
| 224 | + |
| 225 | + def test_find_octopus_base_single(self) -> None: |
| 226 | + r = MemoryRepo() |
| 227 | + base = make_commit() |
| 228 | + r.object_store.add_objects([(base, None)]) |
| 229 | + # Single commit returns itself |
| 230 | + self.assertEqual([base.id], find_octopus_base(r, [base.id])) |
| 231 | + |
| 232 | + def test_find_octopus_base_two_commits(self) -> None: |
| 233 | + r = MemoryRepo() |
| 234 | + base = make_commit() |
| 235 | + c1 = make_commit(parents=[base.id]) |
| 236 | + c2 = make_commit(parents=[c1.id]) |
| 237 | + r.object_store.add_objects([(base, None), (c1, None), (c2, None)]) |
| 238 | + # With two commits it should call find_merge_base |
| 239 | + self.assertEqual([c1.id], find_octopus_base(r, [c1.id, c2.id])) |
| 240 | + |
| 241 | + def test_find_octopus_base_multiple(self) -> None: |
| 242 | + r = MemoryRepo() |
| 243 | + base = make_commit() |
| 244 | + c1 = make_commit(parents=[base.id]) |
| 245 | + c2a = make_commit(parents=[c1.id], message=b"2a") |
| 246 | + c2b = make_commit(parents=[c1.id], message=b"2b") |
| 247 | + c2c = make_commit(parents=[c1.id], message=b"2c") |
| 248 | + r.object_store.add_objects( |
| 249 | + [(base, None), (c1, None), (c2a, None), (c2b, None), (c2c, None)] |
| 250 | + ) |
| 251 | + # Common ancestor of all three branches |
| 252 | + self.assertEqual([c1.id], find_octopus_base(r, [c2a.id, c2b.id, c2c.id])) |
| 253 | + |
| 254 | + |
171 | 255 | class CanFastForwardTests(TestCase):
|
172 | 256 | def test_ff(self) -> None:
|
173 | 257 | r = MemoryRepo()
|
@@ -207,3 +291,33 @@ def test_WorkList(self) -> None:
|
207 | 291 | wlst.add((150, "Test Value 4"))
|
208 | 292 | self.assertEqual(wlst.get(), (150, "Test Value 4"))
|
209 | 293 | self.assertEqual(wlst.get(), (50, "Test Value 2"))
|
| 294 | + |
| 295 | + def test_WorkList_iter(self) -> None: |
| 296 | + # Test the iter method of WorkList |
| 297 | + wlst = WorkList() |
| 298 | + wlst.add((100, "Value 1")) |
| 299 | + wlst.add((200, "Value 2")) |
| 300 | + wlst.add((50, "Value 3")) |
| 301 | + |
| 302 | + # Collect all items from iter |
| 303 | + items = list(wlst.iter()) |
| 304 | + |
| 305 | + # Items should be in their original order, not sorted |
| 306 | + self.assertEqual(len(items), 3) |
| 307 | + |
| 308 | + # Check the values are present with correct timestamps |
| 309 | + timestamps = [dt for dt, _ in items] |
| 310 | + values = [val for _, val in items] |
| 311 | + |
| 312 | + self.assertIn(100, timestamps) |
| 313 | + self.assertIn(200, timestamps) |
| 314 | + self.assertIn(50, timestamps) |
| 315 | + self.assertIn("Value 1", values) |
| 316 | + self.assertIn("Value 2", values) |
| 317 | + self.assertIn("Value 3", values) |
| 318 | + |
| 319 | + def test_WorkList_empty_get(self) -> None: |
| 320 | + # Test getting from an empty WorkList |
| 321 | + wlst = WorkList() |
| 322 | + with self.assertRaises(IndexError): |
| 323 | + wlst.get() |
0 commit comments