1 module dhtslib.vcf.reader;
2 
3 import std.string: fromStringz, toStringz;
4 import std.utf: toUTFz;
5 import std.traits : ReturnType;
6 import std.parallelism : totalCPUs;
7 import std.format : format;
8 
9 import dhtslib.vcf;
10 import dhtslib.tabix;
11 import dhtslib.coordinates;
12 import dhtslib.memory;
13 import dhtslib.util;
14 import htslib.vcf;
15 import htslib.hts_log;
16 import htslib.kstring;
17 
18 alias BCFReader = VCFReader;
19 
20 /** Basic support for reading VCF, BCF files */
21 auto VCFReader(string fn, int extra_threads = -1, UnpackLevel MAX_UNPACK = UnpackLevel.None)
22 {
23     return VCFReaderImpl!(CoordSystem.zbc, false)(fn, extra_threads, MAX_UNPACK);
24 }
25 
26 /** Basic support for reading VCF, BCF files via tabix*/
27 auto VCFReader(CoordSystem cs)(TabixIndexedFile tbxFile, string chrom, Interval!cs coords, int extra_threads = -1, UnpackLevel MAX_UNPACK = UnpackLevel.None)
28 {
29     return VCFReaderImpl!(cs,true)(tbxFile, chrom, coords, extra_threads, MAX_UNPACK);
30 }
31 
32 /** Basic support for reading VCF, BCF files */
33 struct VCFReaderImpl(CoordSystem cs, bool isTabixFile)
34 {
35     // htslib data structures
36     VcfFile     fp;    /// rc htsFile wrapper
37     VCFHeader   vcfhdr;    /// rc header wrapper
38     Bcf1 b;          /// rc bcf1_t wrapper, record for use in iterator, will be recycled
39     UnpackLevel MAX_UNPACK;     /// see htslib.vcf
40 
41 
42     private bool initialized;
43     private int success;
44 
45     @disable this();
46     static if(isTabixFile){
47 
48         TabixIndexedFile tbx; /// For tabix use
49         ReturnType!(this.initializeTabixRange) tbxRange; /// For tabix use
50 
51         /// TabixIndexedFile and coordinates ctor
52         this(TabixIndexedFile tbxFile, string chrom, Interval!cs coords, int extra_threads = -1, UnpackLevel MAX_UNPACK = UnpackLevel.None)
53         {
54             this.tbx = tbxFile;
55             this.tbxRange = initializeTabixRange(chrom, coords);
56             this.tbxRange.empty;
57             /// read the header from TabixIndexedFile
58             bcf_hdr_t * hdrPtr = bcf_hdr_init(toUTFz!(char *)("r"));
59             auto err = bcf_hdr_parse(hdrPtr, toUTFz!(char *)(this.tbx.header));
60             this.vcfhdr = VCFHeader(hdrPtr);
61             
62             this.b = Bcf1(bcf_init1());
63             this.b.max_unpack = MAX_UNPACK;
64             this.MAX_UNPACK = MAX_UNPACK;
65             this.empty;
66         }
67 
68         /// copy the TabixIndexedFile.region range
69         auto initializeTabixRange(string chrom, Interval!cs coords)
70         {
71             return this.tbx.region(chrom, coords);
72         }
73     }else{
74         /// read existing VCF file
75         /// MAX_UNPACK: setting alternate value could speed reading
76         this(string fn, int extra_threads = -1, UnpackLevel MAX_UNPACK = UnpackLevel.None)
77         {
78             import htslib.hts : hts_set_threads;
79 
80             assert(!isTabixFile);
81             if (fn == "") throw new Exception("Empty filename passed to VCFReader constructor");
82             this.fp = VcfFile(vcf_open(toStringz(fn), "r"c.ptr));
83             if (!this.fp) throw new Exception("Could not hts_open file");
84 
85             if (extra_threads == -1)
86             {
87                 if ( totalCPUs > 1)
88                 {
89                     hts_log_info(__FUNCTION__,
90                             format("%d CPU cores detected; enabling multithreading", totalCPUs));
91                     // hts_set_threads adds N _EXTRA_ threads, so totalCPUs - 1 seemed reasonable,
92                     // but overcomitting by 1 thread (i.e., passing totalCPUs) buys an extra 3% on my 2-core 2013 Mac
93                     hts_set_threads(this.fp, totalCPUs);
94                 }
95             } else if (extra_threads > 0)
96             {
97                 if ((extra_threads + 1) > totalCPUs)
98                     hts_log_warning(__FUNCTION__, "More threads requested than CPU cores detected");
99                 hts_set_threads(this.fp, extra_threads);
100             }
101             else if (extra_threads == 0)
102             {
103                 hts_log_debug(__FUNCTION__, "Zero extra threads requested");
104             }
105 
106             this.vcfhdr = VCFHeader(bcf_hdr_read(this.fp));
107 
108             this.b = Bcf1(bcf_init1());
109             this.b.max_unpack = MAX_UNPACK;
110             this.MAX_UNPACK = MAX_UNPACK;
111             this.empty;
112         }
113     }
114 
115     /// Explicit postblit to avoid 
116     /// https://github.com/blachlylab/dhtslib/issues/122
117     this(this)
118     {
119         this.fp = fp;
120         this.vcfhdr = vcfhdr;
121         this.b = b;
122         this.MAX_UNPACK = MAX_UNPACK;
123     }
124 
125     invariant
126     {
127         assert(this.vcfhdr.hdr != null);
128     }
129 
130     VCFHeader getHeader()
131     {
132         return this.vcfhdr;
133     }
134 
135     /// InputRange interface: iterate over all records
136     @property bool empty()
137     {
138         static if(isTabixFile){
139             if(this.tbxRange.empty) return true;
140         }
141         if (!this.initialized) {
142             this.popFront();
143             this.initialized = true;
144         }
145         if (success >= 0)
146             return false;
147         else if (success == -1)
148             return true;
149         else
150         {
151             static if(isTabixFile)
152                 hts_log_error(__FUNCTION__, "*** ERROR vcf_parse < -1");
153             else
154                 hts_log_error(__FUNCTION__, "*** ERROR bcf_read < -1");
155             return true;
156         }
157         
158     }
159     /// ditto
160     void popFront()
161     {
162         //     int bcf_read(htsFile *fp, const(bcf_hdr_t) *h, bcf1_t *v);
163         // documentation claims returns -1 on critical errors, 0 otherwise
164         // however it looks like -1 is EOF and -2 is critical errors?
165         static if(isTabixFile){
166             if (this.initialized) this.tbxRange.popFront;
167             auto str = this.tbxRange.front;
168             kstring_t kstr;
169             kputs(toUTFz!(char *)(str), &kstr);
170             this.success = vcf_parse(&kstr, this.vcfhdr.hdr, this.b);
171             
172         }else
173             this.success = bcf_read(this.fp, this.vcfhdr.hdr, this.b);
174 
175     }
176     /// ditto
177     VCFRecord front()
178     {
179         // note that VCFRecord's constructor will be responsible for
180         // * UnpackLeveling and
181         // * destroying
182         // its copy
183         auto copiedBcf_1t = bcf_dup(this.b);
184         return VCFRecord(this.vcfhdr, copiedBcf_1t, this.MAX_UNPACK);
185     }
186 
187     typeof(this) save()
188     {
189         typeof(this) newRange = this;
190         static if(isTabixFile){
191             newRange.tbx = tbx; /// For tabix use
192             newRange.tbxRange = tbxRange.save; /// For tabix use
193         }else{
194             newRange.fp = HtsFile(copyHtsFile(this.fp));
195         }
196         newRange.vcfhdr = vcfhdr;
197         newRange.b = Bcf1(bcf_dup(this.b));
198         newRange.initialized = initialized;
199         newRange.success = success;
200         return newRange;
201     }
202 }
203 
204 debug(dhtslib_unittest) unittest
205 {
206     import std.stdio;
207     import htslib.hts_log;
208     import std.algorithm : map, count;
209     import std.array : array;
210     import std.path : buildPath, dirName;
211     import std.math : approxEqual;
212     hts_set_log_level(htsLogLevel.HTS_LOG_INFO);
213     hts_log_info(__FUNCTION__, "Testing VCFReader (Tabix)");
214     hts_log_info(__FUNCTION__, "Loading test file");
215 
216     auto vcf = VCFReader(buildPath(dirName(dirName(dirName(dirName(__FILE__)))),"htslib","test","tabix","vcf_file.vcf"));
217     assert(vcf.count == 14);
218     vcf = VCFReader(buildPath(dirName(dirName(dirName(dirName(__FILE__)))),"htslib","test","tabix","vcf_file.vcf"));
219     VCFRecord rec = vcf.front;
220     assert(rec.chrom == "1");
221     assert(rec.pos == 3000149);
222     assert(rec.refAllele == "C");
223     assert(rec.altAllelesAsArray == ["T"]);
224     assert(rec.allelesAsArray == ["C","T"]);
225     assert(approxEqual(rec.qual,59.2));
226     assert(rec.filter == "PASS");
227     
228     vcf.popFront;
229     rec = vcf.front;
230 
231     assert(rec.chrom == "1");
232     assert(rec.pos == 3000150);
233     assert(rec.refAllele == "C");
234     assert(rec.altAllelesAsArray == ["T"]);
235     assert(rec.allelesAsArray == ["C","T"]);
236     assert(approxEqual(rec.qual,59.2));
237     assert(rec.filter == "PASS");
238     
239     vcf = VCFReader(buildPath(dirName(dirName(dirName(dirName(__FILE__)))),"htslib","test","tabix","vcf_file.vcf"));
240     auto range1 = vcf.save;
241     vcf.popFront;
242     auto range2 = vcf.save;
243     vcf.popFront;
244     vcf.popFront;
245     auto range3 = vcf.save;
246     vcf.popFront;
247     vcf.popFront;
248     vcf.popFront;
249     auto range4 = vcf.save;
250     assert(range1.array.length == 14);
251     assert(range2.array.length == 13);
252     assert(range3.array.length == 11);
253     assert(range4.array.length == 8);
254 }
255 
256 debug(dhtslib_unittest) unittest
257 {
258     import dhtslib.util;
259     import std.stdio;
260     import htslib.hts_log;
261     import std.algorithm : map, count;
262     import std.array : array;
263     import std.path : buildPath, dirName;
264     import std.math : approxEqual;
265     hts_set_log_level(htsLogLevel.HTS_LOG_INFO);
266     hts_log_info(__FUNCTION__, "Testing VCFReader");
267     hts_log_info(__FUNCTION__, "Loading test file");
268     auto fn = buildPath(dirName(dirName(dirName(dirName(__FILE__)))),"htslib","test","tabix","vcf_file.vcf.gz");
269     auto tbx = TabixIndexedFile(fn);
270     auto reg = getIntervalFromString("1:3000151-3062916");
271     auto vcf = VCFReader(tbx, reg.contig, reg.interval);
272     assert(vcf.count == 3);
273     vcf = VCFReader(tbx, reg.contig, reg.interval);
274     VCFRecord rec = vcf.front;
275     assert(rec.chrom == "1");
276     assert(rec.pos == 3000150);
277     assert(rec.refAllele == "C");
278     assert(rec.altAllelesAsArray == ["T"]);
279     assert(rec.allelesAsArray == ["C","T"]);
280     assert(approxEqual(rec.qual, 59.2));
281     assert(rec.filter == "PASS");
282     assert(vcf.vcfhdr.getSamples == ["A", "B"]);
283 
284     vcf.popFront;
285     rec = vcf.front;
286 
287     assert(rec.chrom == "1");
288     assert(rec.pos == 3062914);
289     assert(rec.refAllele == "GTTT");
290     assert(rec.altAllelesAsArray == ["G"]);
291     assert(rec.allelesAsArray == ["GTTT","G"]);
292     assert(approxEqual(rec.qual,12.9));
293     assert(rec.filter == "q10");
294     // assert(rec. == ["C","T"]);
295 
296     vcf = VCFReader(tbx, reg.contig, reg.interval);
297     auto range1 = vcf.save;
298     vcf.popFront;
299     auto range2 = vcf.save;
300     vcf.popFront;
301     auto range3 = vcf.save;
302     vcf.popFront;
303     auto range4 = vcf.save;
304     assert(range1.array.length == 3);
305     assert(range2.array.length == 2);
306     assert(range3.array.length == 1);
307     assert(range4.array.length == 0);
308 }
309 
310 debug(dhtslib_unittest) unittest
311 {
312     import dhtslib.util;
313     import std.stdio;
314     import htslib.hts_log;
315     import std.algorithm : map, count;
316     import std.array : array;
317     import std.path : buildPath, dirName;
318     import std.math : approxEqual;
319     hts_set_log_level(htsLogLevel.HTS_LOG_INFO);
320     hts_log_info(__FUNCTION__, "Testing bcf1_t Unpacking");
321     hts_log_info(__FUNCTION__, "Loading test file");
322     auto fn = buildPath(dirName(dirName(dirName(dirName(__FILE__)))),"htslib","test","tabix","vcf_file.vcf.gz");
323     auto tbx = TabixIndexedFile(fn);
324     auto reg = getIntervalFromString("1:3000151-3062916");
325 
326     auto vcf = VCFReader(tbx, reg.contig, reg.interval);
327     assert(vcf.count == 3);
328     vcf = VCFReader(tbx, reg.contig, reg.interval, UnpackLevel.None);
329     VCFRecord rec = vcf.front;
330 
331     assert(rec.chrom == "1");
332     assert(rec.pos == 3000150);
333 
334     assert(approxEqual(rec.qual, 59.2));
335     assert(rec.line.unpacked == UnpackLevel.None);
336 
337     assert(rec.id == ".");
338     assert(rec.line.unpacked == UnpackLevel.AltAllele);
339 
340     assert(rec.refAllele == "C");
341     assert(rec.line.unpacked == UnpackLevel.AltAllele);
342 
343     assert(rec.altAllelesAsArray == ["T"]);
344     assert(rec.line.unpacked == UnpackLevel.AltAllele);
345 
346     assert(rec.filter == "PASS");
347     assert(rec.line.unpacked == (UnpackLevel.Filter | UnpackLevel.AltAllele));
348 
349     assert(rec.getInfos["AN"].to!int == 4);
350     assert(rec.line.unpacked == UnpackLevel.SharedFields);
351 
352     assert(rec.getFormats["DP"].to!int.array == [[32], [32]]);
353     assert(rec.line.unpacked == UnpackLevel.All);
354 
355     /// only one extra test for pulling only format fields
356     ///
357     /// bcf_unpack automatically promotes UnpackLevel.Filter to
358     /// (UnpackLevel.Filter | UnpackLevel.AltAllele)
359     ///
360     /// bcf_unpack automatically promotes UnpackLevel.Info to
361     /// (UnpackLevel.Info | UnpackLevel.SharedFields)
362 
363     /// since front calls bcf_dup, we get a fresh unpacked record
364     rec = vcf.front;
365     assert(rec.getFormats["DP"].to!int.array == [[32], [32]]);
366     assert(rec.line.unpacked == UnpackLevel.Format);
367     
368 }