1 module dhtslib.vcf.reader; 2 3 import std.string: fromStringz, toStringz; 4 import std.utf: toUTFz; 5 import std.traits : ReturnType; 6 import std.parallelism : totalCPUs; 7 import std.format : format; 8 9 import dhtslib.vcf; 10 import dhtslib.tabix; 11 import dhtslib.coordinates; 12 import dhtslib.memory; 13 import dhtslib.util; 14 import htslib.vcf; 15 import htslib.hts_log; 16 import htslib.kstring; 17 18 alias BCFReader = VCFReader; 19 20 /** Basic support for reading VCF, BCF files */ 21 auto VCFReader(string fn, int extra_threads = -1, UnpackLevel MAX_UNPACK = UnpackLevel.None) 22 { 23 return VCFReaderImpl!(CoordSystem.zbc, false)(fn, extra_threads, MAX_UNPACK); 24 } 25 26 /** Basic support for reading VCF, BCF files via tabix*/ 27 auto VCFReader(CoordSystem cs)(TabixIndexedFile tbxFile, string chrom, Interval!cs coords, int extra_threads = -1, UnpackLevel MAX_UNPACK = UnpackLevel.None) 28 { 29 return VCFReaderImpl!(cs,true)(tbxFile, chrom, coords, extra_threads, MAX_UNPACK); 30 } 31 32 /** Basic support for reading VCF, BCF files */ 33 struct VCFReaderImpl(CoordSystem cs, bool isTabixFile) 34 { 35 // htslib data structures 36 VcfFile fp; /// rc htsFile wrapper 37 VCFHeader vcfhdr; /// rc header wrapper 38 Bcf1 b; /// rc bcf1_t wrapper, record for use in iterator, will be recycled 39 UnpackLevel MAX_UNPACK; /// see htslib.vcf 40 41 42 private bool initialized; 43 private int success; 44 45 @disable this(); 46 static if(isTabixFile){ 47 48 TabixIndexedFile tbx; /// For tabix use 49 ReturnType!(this.initializeTabixRange) tbxRange; /// For tabix use 50 51 /// TabixIndexedFile and coordinates ctor 52 this(TabixIndexedFile tbxFile, string chrom, Interval!cs coords, int extra_threads = -1, UnpackLevel MAX_UNPACK = UnpackLevel.None) 53 { 54 this.tbx = tbxFile; 55 this.tbxRange = initializeTabixRange(chrom, coords); 56 this.tbxRange.empty; 57 /// read the header from TabixIndexedFile 58 bcf_hdr_t * hdrPtr = bcf_hdr_init(toUTFz!(char *)("r")); 59 auto err = bcf_hdr_parse(hdrPtr, toUTFz!(char *)(this.tbx.header)); 60 this.vcfhdr = VCFHeader(hdrPtr); 61 62 this.b = Bcf1(bcf_init1()); 63 this.b.max_unpack = MAX_UNPACK; 64 this.MAX_UNPACK = MAX_UNPACK; 65 this.empty; 66 } 67 68 /// copy the TabixIndexedFile.region range 69 auto initializeTabixRange(string chrom, Interval!cs coords) 70 { 71 return this.tbx.region(chrom, coords); 72 } 73 }else{ 74 /// read existing VCF file 75 /// MAX_UNPACK: setting alternate value could speed reading 76 this(string fn, int extra_threads = -1, UnpackLevel MAX_UNPACK = UnpackLevel.None) 77 { 78 import htslib.hts : hts_set_threads; 79 80 assert(!isTabixFile); 81 if (fn == "") throw new Exception("Empty filename passed to VCFReader constructor"); 82 this.fp = VcfFile(vcf_open(toStringz(fn), "r"c.ptr)); 83 if (!this.fp) throw new Exception("Could not hts_open file"); 84 85 if (extra_threads == -1) 86 { 87 if ( totalCPUs > 1) 88 { 89 hts_log_info(__FUNCTION__, 90 format("%d CPU cores detected; enabling multithreading", totalCPUs)); 91 // hts_set_threads adds N _EXTRA_ threads, so totalCPUs - 1 seemed reasonable, 92 // but overcomitting by 1 thread (i.e., passing totalCPUs) buys an extra 3% on my 2-core 2013 Mac 93 hts_set_threads(this.fp, totalCPUs); 94 } 95 } else if (extra_threads > 0) 96 { 97 if ((extra_threads + 1) > totalCPUs) 98 hts_log_warning(__FUNCTION__, "More threads requested than CPU cores detected"); 99 hts_set_threads(this.fp, extra_threads); 100 } 101 else if (extra_threads == 0) 102 { 103 hts_log_debug(__FUNCTION__, "Zero extra threads requested"); 104 } 105 106 this.vcfhdr = VCFHeader(bcf_hdr_read(this.fp)); 107 108 this.b = Bcf1(bcf_init1()); 109 this.b.max_unpack = MAX_UNPACK; 110 this.MAX_UNPACK = MAX_UNPACK; 111 this.empty; 112 } 113 } 114 115 /// Explicit postblit to avoid 116 /// https://github.com/blachlylab/dhtslib/issues/122 117 this(this) 118 { 119 this.fp = fp; 120 this.vcfhdr = vcfhdr; 121 this.b = b; 122 this.MAX_UNPACK = MAX_UNPACK; 123 } 124 125 invariant 126 { 127 assert(this.vcfhdr.hdr != null); 128 } 129 130 VCFHeader getHeader() 131 { 132 return this.vcfhdr; 133 } 134 135 /// InputRange interface: iterate over all records 136 @property bool empty() 137 { 138 static if(isTabixFile){ 139 if(this.tbxRange.empty) return true; 140 } 141 if (!this.initialized) { 142 this.popFront(); 143 this.initialized = true; 144 } 145 if (success >= 0) 146 return false; 147 else if (success == -1) 148 return true; 149 else 150 { 151 static if(isTabixFile) 152 hts_log_error(__FUNCTION__, "*** ERROR vcf_parse < -1"); 153 else 154 hts_log_error(__FUNCTION__, "*** ERROR bcf_read < -1"); 155 return true; 156 } 157 158 } 159 /// ditto 160 void popFront() 161 { 162 // int bcf_read(htsFile *fp, const(bcf_hdr_t) *h, bcf1_t *v); 163 // documentation claims returns -1 on critical errors, 0 otherwise 164 // however it looks like -1 is EOF and -2 is critical errors? 165 static if(isTabixFile){ 166 if (this.initialized) this.tbxRange.popFront; 167 auto str = this.tbxRange.front; 168 kstring_t kstr; 169 kputs(toUTFz!(char *)(str), &kstr); 170 this.success = vcf_parse(&kstr, this.vcfhdr.hdr, this.b); 171 172 }else 173 this.success = bcf_read(this.fp, this.vcfhdr.hdr, this.b); 174 175 } 176 /// ditto 177 VCFRecord front() 178 { 179 // note that VCFRecord's constructor will be responsible for 180 // * UnpackLeveling and 181 // * destroying 182 // its copy 183 auto copiedBcf_1t = bcf_dup(this.b); 184 return VCFRecord(this.vcfhdr, copiedBcf_1t, this.MAX_UNPACK); 185 } 186 187 typeof(this) save() 188 { 189 typeof(this) newRange = this; 190 static if(isTabixFile){ 191 newRange.tbx = tbx; /// For tabix use 192 newRange.tbxRange = tbxRange.save; /// For tabix use 193 }else{ 194 newRange.fp = HtsFile(copyHtsFile(this.fp)); 195 } 196 newRange.vcfhdr = vcfhdr; 197 newRange.b = Bcf1(bcf_dup(this.b)); 198 newRange.initialized = initialized; 199 newRange.success = success; 200 return newRange; 201 } 202 } 203 204 debug(dhtslib_unittest) unittest 205 { 206 import std.stdio; 207 import htslib.hts_log; 208 import std.algorithm : map, count; 209 import std.array : array; 210 import std.path : buildPath, dirName; 211 import std.math : approxEqual; 212 hts_set_log_level(htsLogLevel.HTS_LOG_INFO); 213 hts_log_info(__FUNCTION__, "Testing VCFReader (Tabix)"); 214 hts_log_info(__FUNCTION__, "Loading test file"); 215 216 auto vcf = VCFReader(buildPath(dirName(dirName(dirName(dirName(__FILE__)))),"htslib","test","tabix","vcf_file.vcf")); 217 assert(vcf.count == 14); 218 vcf = VCFReader(buildPath(dirName(dirName(dirName(dirName(__FILE__)))),"htslib","test","tabix","vcf_file.vcf")); 219 VCFRecord rec = vcf.front; 220 assert(rec.chrom == "1"); 221 assert(rec.pos == 3000149); 222 assert(rec.refAllele == "C"); 223 assert(rec.altAllelesAsArray == ["T"]); 224 assert(rec.allelesAsArray == ["C","T"]); 225 assert(approxEqual(rec.qual,59.2)); 226 assert(rec.filter == "PASS"); 227 228 vcf.popFront; 229 rec = vcf.front; 230 231 assert(rec.chrom == "1"); 232 assert(rec.pos == 3000150); 233 assert(rec.refAllele == "C"); 234 assert(rec.altAllelesAsArray == ["T"]); 235 assert(rec.allelesAsArray == ["C","T"]); 236 assert(approxEqual(rec.qual,59.2)); 237 assert(rec.filter == "PASS"); 238 239 vcf = VCFReader(buildPath(dirName(dirName(dirName(dirName(__FILE__)))),"htslib","test","tabix","vcf_file.vcf")); 240 auto range1 = vcf.save; 241 vcf.popFront; 242 auto range2 = vcf.save; 243 vcf.popFront; 244 vcf.popFront; 245 auto range3 = vcf.save; 246 vcf.popFront; 247 vcf.popFront; 248 vcf.popFront; 249 auto range4 = vcf.save; 250 assert(range1.array.length == 14); 251 assert(range2.array.length == 13); 252 assert(range3.array.length == 11); 253 assert(range4.array.length == 8); 254 } 255 256 debug(dhtslib_unittest) unittest 257 { 258 import dhtslib.util; 259 import std.stdio; 260 import htslib.hts_log; 261 import std.algorithm : map, count; 262 import std.array : array; 263 import std.path : buildPath, dirName; 264 import std.math : approxEqual; 265 hts_set_log_level(htsLogLevel.HTS_LOG_INFO); 266 hts_log_info(__FUNCTION__, "Testing VCFReader"); 267 hts_log_info(__FUNCTION__, "Loading test file"); 268 auto fn = buildPath(dirName(dirName(dirName(dirName(__FILE__)))),"htslib","test","tabix","vcf_file.vcf.gz"); 269 auto tbx = TabixIndexedFile(fn); 270 auto reg = getIntervalFromString("1:3000151-3062916"); 271 auto vcf = VCFReader(tbx, reg.contig, reg.interval); 272 assert(vcf.count == 3); 273 vcf = VCFReader(tbx, reg.contig, reg.interval); 274 VCFRecord rec = vcf.front; 275 assert(rec.chrom == "1"); 276 assert(rec.pos == 3000150); 277 assert(rec.refAllele == "C"); 278 assert(rec.altAllelesAsArray == ["T"]); 279 assert(rec.allelesAsArray == ["C","T"]); 280 assert(approxEqual(rec.qual, 59.2)); 281 assert(rec.filter == "PASS"); 282 assert(vcf.vcfhdr.getSamples == ["A", "B"]); 283 284 vcf.popFront; 285 rec = vcf.front; 286 287 assert(rec.chrom == "1"); 288 assert(rec.pos == 3062914); 289 assert(rec.refAllele == "GTTT"); 290 assert(rec.altAllelesAsArray == ["G"]); 291 assert(rec.allelesAsArray == ["GTTT","G"]); 292 assert(approxEqual(rec.qual,12.9)); 293 assert(rec.filter == "q10"); 294 // assert(rec. == ["C","T"]); 295 296 vcf = VCFReader(tbx, reg.contig, reg.interval); 297 auto range1 = vcf.save; 298 vcf.popFront; 299 auto range2 = vcf.save; 300 vcf.popFront; 301 auto range3 = vcf.save; 302 vcf.popFront; 303 auto range4 = vcf.save; 304 assert(range1.array.length == 3); 305 assert(range2.array.length == 2); 306 assert(range3.array.length == 1); 307 assert(range4.array.length == 0); 308 } 309 310 debug(dhtslib_unittest) unittest 311 { 312 import dhtslib.util; 313 import std.stdio; 314 import htslib.hts_log; 315 import std.algorithm : map, count; 316 import std.array : array; 317 import std.path : buildPath, dirName; 318 import std.math : approxEqual; 319 hts_set_log_level(htsLogLevel.HTS_LOG_INFO); 320 hts_log_info(__FUNCTION__, "Testing bcf1_t Unpacking"); 321 hts_log_info(__FUNCTION__, "Loading test file"); 322 auto fn = buildPath(dirName(dirName(dirName(dirName(__FILE__)))),"htslib","test","tabix","vcf_file.vcf.gz"); 323 auto tbx = TabixIndexedFile(fn); 324 auto reg = getIntervalFromString("1:3000151-3062916"); 325 326 auto vcf = VCFReader(tbx, reg.contig, reg.interval); 327 assert(vcf.count == 3); 328 vcf = VCFReader(tbx, reg.contig, reg.interval, UnpackLevel.None); 329 VCFRecord rec = vcf.front; 330 331 assert(rec.chrom == "1"); 332 assert(rec.pos == 3000150); 333 334 assert(approxEqual(rec.qual, 59.2)); 335 assert(rec.line.unpacked == UnpackLevel.None); 336 337 assert(rec.id == "."); 338 assert(rec.line.unpacked == UnpackLevel.AltAllele); 339 340 assert(rec.refAllele == "C"); 341 assert(rec.line.unpacked == UnpackLevel.AltAllele); 342 343 assert(rec.altAllelesAsArray == ["T"]); 344 assert(rec.line.unpacked == UnpackLevel.AltAllele); 345 346 assert(rec.filter == "PASS"); 347 assert(rec.line.unpacked == (UnpackLevel.Filter | UnpackLevel.AltAllele)); 348 349 assert(rec.getInfos["AN"].to!int == 4); 350 assert(rec.line.unpacked == UnpackLevel.SharedFields); 351 352 assert(rec.getFormats["DP"].to!int.array == [[32], [32]]); 353 assert(rec.line.unpacked == UnpackLevel.All); 354 355 /// only one extra test for pulling only format fields 356 /// 357 /// bcf_unpack automatically promotes UnpackLevel.Filter to 358 /// (UnpackLevel.Filter | UnpackLevel.AltAllele) 359 /// 360 /// bcf_unpack automatically promotes UnpackLevel.Info to 361 /// (UnpackLevel.Info | UnpackLevel.SharedFields) 362 363 /// since front calls bcf_dup, we get a fresh unpacked record 364 rec = vcf.front; 365 assert(rec.getFormats["DP"].to!int.array == [[32], [32]]); 366 assert(rec.line.unpacked == UnpackLevel.Format); 367 368 }