1
1
"""BGEN reader implementation (using bgen_reader)"""
2
2
from pathlib import Path
3
- from typing import Any , Union
3
+ from typing import Any , Dict , Tuple , Union
4
4
5
5
import dask .array as da
6
+ import dask .dataframe as dd
6
7
import numpy as np
7
8
from bgen_reader ._bgen_file import bgen_file
8
9
from bgen_reader ._bgen_metafile import bgen_metafile
18
19
PathType = Union [str , Path ]
19
20
20
21
21
- def _to_dict (df , dtype = None ):
22
+ def _to_dict (df : dd . DataFrame , dtype : Any = None ) -> Dict [ str , da . Array ] :
22
23
return {
23
24
c : df [c ].to_dask_array (lengths = True ).astype (dtype [c ] if dtype else df [c ].dtype )
24
25
for c in df
@@ -42,7 +43,9 @@ class BgenReader:
42
43
43
44
name = "bgen_reader"
44
45
45
- def __init__ (self , path , persist = True , dtype = np .float32 ):
46
+ def __init__ (
47
+ self , path : PathType , persist : bool = True , dtype : Any = np .float32
48
+ ) -> None :
46
49
self .path = Path (path )
47
50
48
51
self .metafile_filepath = infer_metafile_filepath (Path (self .path ))
@@ -63,11 +66,13 @@ def __init__(self, path, persist=True, dtype=np.float32):
63
66
self .contig = variant_arrs ["chrom" ]
64
67
self .pos = variant_arrs ["pos" ]
65
68
66
- def split_alleles (alleles , block_info = None ):
69
+ def split_alleles (
70
+ alleles : np .ndarray , block_info : Any = None
71
+ ) -> np .ndarray :
67
72
if block_info is None or len (block_info ) == 0 :
68
73
return alleles
69
74
70
- def split (allele_row ) :
75
+ def split (allele_row : np . ndarray ) -> np . ndarray :
71
76
alleles_list = allele_row [0 ].split ("," )
72
77
assert len (alleles_list ) == 2 # bi-allelic
73
78
return np .array (alleles_list )
@@ -98,7 +103,7 @@ def max_str_len(arr: ArrayLike) -> Any:
98
103
self .dtype = dtype
99
104
self .ndim = 3
100
105
101
- def __getitem__ (self , idx ) :
106
+ def __getitem__ (self , idx : Any ) -> np . ndarray :
102
107
if not isinstance (idx , tuple ):
103
108
raise IndexError (f"Indexer must be tuple (received { type (idx )} )" )
104
109
if len (idx ) != self .ndim :
@@ -150,11 +155,11 @@ def __getitem__(self, idx):
150
155
if res is None :
151
156
res = np .zeros ((len (all_vaddr ), len (probs ), 3 ), dtype = self .dtype )
152
157
res [i ] = probs
153
- res = res [..., idx [2 ]]
158
+ res = res [..., idx [2 ]] # type: ignore[index]
154
159
return np .squeeze (res , axis = squeeze_dims )
155
160
156
161
157
- def _to_dosage (probs : ArrayLike ):
162
+ def _to_dosage (probs : ArrayLike ) -> ArrayLike :
158
163
"""Calculate the dosage from genotype likelihoods (probabilities)"""
159
164
assert (
160
165
probs .shape [- 1 ] == 3
@@ -164,7 +169,7 @@ def _to_dosage(probs: ArrayLike):
164
169
165
170
def read_bgen (
166
171
path : PathType ,
167
- chunks : Union [str , int , tuple ] = "auto" ,
172
+ chunks : Union [str , int , Tuple [ int , ...] ] = "auto" ,
168
173
lock : bool = False ,
169
174
persist : bool = True ,
170
175
) -> Dataset :
@@ -217,7 +222,7 @@ def read_bgen(
217
222
)
218
223
call_dosage = _to_dosage (call_genotype_probability )
219
224
220
- ds = create_genotype_dosage_dataset (
225
+ ds : Dataset = create_genotype_dosage_dataset (
221
226
variant_contig_names = variant_contig_names ,
222
227
variant_contig = variant_contig ,
223
228
variant_position = variant_position ,
0 commit comments