1 /** 2 Module providing a wrapper, `TabixIndexedFile` over a line-oriented NGS flat file, 3 such as BED, GFF3, VCF that has been indexed with tabix. 4 5 The wrapper provides a list of reference sequence names, as well as iterator over 6 all rows falling within a sequence range, e.g. "chr1:1000-2000" 7 */ 8 9 module dhtslib.tabix; 10 11 import std.stdio; 12 import std.string; 13 import std.traits : ReturnType; 14 import std.range : inputRangeObject, InputRangeObject; 15 import std.parallelism : totalCPUs; 16 import core.stdc.stdlib : malloc, free; 17 18 import htslib.hts; 19 import htslib.hts_log; 20 import htslib.kstring; 21 import htslib.tbx; 22 import dhtslib.coordinates; 23 import dhtslib.memory; 24 import dhtslib.util; 25 //import htslib.regidx; 26 27 /** Encapsulates a position-sorted record-oriented NGS flat file, 28 * indexed with Tabix, including BED, GFF3, VCF. 29 * 30 * region(string r) returns an InputRange that iterates through rows of the file intersecting or contained within the requested range 31 */ 32 struct TabixIndexedFile { 33 34 HtsFile fp; /// rc wrapper around htsFile ptr 35 Tbx tbx; /// rc wrapper around tabix ptr 36 37 string header; /// NGS flat file's header (if any; e.g. BED may not have one) 38 39 /// Initialize with a complete file path name to the tabix-indexed file 40 /// The tabix index (.tbi) must already exist alongside 41 this(const(char)[] fn, int extra_threads = -1, const(char)[] fntbi = "") 42 { 43 debug(dhtslib_debug) { writeln("TabixIndexedFile ctor"); } 44 this.fp = HtsFile(hts_open( toStringz(fn), "r")); 45 if ( !this.fp ) { 46 writefln("Could not read %s\n", fn); 47 throw new Exception("Couldn't read file"); 48 } 49 50 if (extra_threads == -1) 51 { 52 if ( totalCPUs > 1) 53 { 54 hts_log_info(__FUNCTION__, 55 format("%d CPU cores detected; enabling multithreading", totalCPUs)); 56 // hts_set_threads adds N _EXTRA_ threads, so totalCPUs - 1 seemed reasonable, 57 // but overcomitting by 1 thread (i.e., passing totalCPUs) buys an extra 3% on my 2-core 2013 Mac 58 hts_set_threads(this.fp, totalCPUs); 59 } 60 } else if (extra_threads > 0) 61 { 62 if ((extra_threads + 1) > totalCPUs) 63 hts_log_warning(__FUNCTION__, "More threads requested than CPU cores detected"); 64 hts_set_threads(this.fp, extra_threads); 65 } 66 else if (extra_threads == 0) 67 { 68 hts_log_debug(__FUNCTION__, "Zero extra threads requested"); 69 } 70 71 //enum htsExactFormat format = hts_get_format(fp)->format; 72 if(fntbi!="") this.tbx = Tbx(tbx_index_load2( toStringz(fn), toStringz(fntbi) )); 73 else this.tbx = Tbx(tbx_index_load( toStringz(fn) )); 74 if (!this.tbx) { 75 writefln("Could not load .tbi index of %s\n", fn ); 76 throw new Exception("Couldn't load tabix index file"); 77 } 78 79 loadHeader(); 80 } 81 82 /// Explicit postblit to avoid 83 /// https://github.com/blachlylab/dhtslib/issues/122 84 this(this) 85 { 86 this.fp = fp; 87 this.tbx = tbx; 88 this.header = header; 89 } 90 91 private void loadHeader() 92 { 93 kstring_t str; 94 scope(exit) { free(str.s); } 95 96 while ( hts_getline(this.fp, '\n', &str) >= 0 ) 97 { 98 if ( !str.l || str.s[0] != this.tbx.conf.meta_char ) break; 99 this.header ~= fromStringz(str.s) ~ '\n'; 100 } 101 102 debug(dhtslib_debug) { writeln("end loadHeader"); } 103 } 104 105 /// tbx.d: const(char **) tbx_seqnames(tbx_t *tbx, int *n); // free the array but not the values 106 @property string[] sequenceNames() 107 { 108 // TODO: Check for memory leaks (free the array after copy into sequence_names) 109 int nseqs; 110 111 string[] sequence_names; 112 113 const(char **) seqnames = tbx_seqnames(this.tbx, &nseqs); 114 115 for(int i; i<nseqs; i++) { 116 sequence_names ~= cast(string) fromStringz(seqnames[i]); 117 } 118 119 return sequence_names; 120 } 121 122 /** region(r) 123 * returns an InputRange that iterates through rows of the file intersecting or contained within the requested range 124 */ 125 auto region(CoordSystem cs)(string chrom, Interval!cs coords) 126 { 127 auto newCoords = coords.to!(CoordSystem.zbho); 128 struct Region 129 { 130 131 /** TODO: determine how thread-(un)safe this is (i.e., using a potentially shared *fp and *tbx) */ 132 private HtsFile fp; 133 private Tbx tbx; 134 135 private HtsItr itr; 136 private string next; 137 138 // necessary because the alternative strategy of preloading the first row 139 // leads to problems when Struct inst is blitted -> 140 // re-iterating always returns first row only (since *itr is expended 141 // but first row was preloaded in this.next) 142 private bool active; 143 private string chrom; 144 145 this(HtsFile fp, Tbx tbx, string chrom, Interval!(CoordSystem.zbho) coords) 146 { 147 this.fp = fp; 148 this.tbx = tbx; 149 this.chrom = chrom; 150 this.itr = HtsItr(tbx_itr_queryi(tbx, tbx_name2id(tbx, toStringz(this.chrom)), coords.start, coords.end)); 151 this.empty; 152 debug(dhtslib_debug) { writeln("Region ctor // this.itr: ", this.itr); } 153 } 154 155 // had to remove "const" property from empty() due to manipulation of this.active 156 @property bool empty() { 157 158 if (!this.active) { 159 // this is the first call to empty() (and use of the range) 160 // Let's make it active and attempt to load the first record, if one exists 161 this.active = true; 162 this.popFront(); 163 } 164 165 if (!this.next) return true; 166 else return false; 167 } 168 169 @property string front() const { 170 return this.next; 171 } 172 173 void popFront() { 174 // closure over fp and tbx? (i.e. potentially unsafe?) 175 176 // Get next entry 177 kstring_t kstr; 178 immutable res = tbx_itr_next(this.fp, this.tbx, this.itr, &kstr); 179 if (res < 0) { 180 // we are done 181 this.next = null; 182 } else { 183 // Otherwise load into next 184 this.next = fromStringz(kstr.s).idup; 185 free(kstr.s); 186 } 187 } 188 Region save() 189 { 190 Region newRange; 191 newRange.fp = HtsFile(copyHtsFile(fp)); 192 newRange.itr = HtsItr(copyHtsItr(itr)); 193 newRange.tbx = tbx; 194 newRange.next = next; 195 newRange.active = active; 196 newRange.chrom = chrom; 197 return newRange; 198 } 199 } 200 201 return Region(this.fp, this.tbx, chrom, newCoords); 202 } 203 204 } 205 206 // TODO: figure out how to make this unittest with just htslib files 207 208 // debug(dhtslib_unittest) unittest 209 // { 210 // import htslib.hts_log; 211 // import dhtslib.vcf; 212 // import std.path : buildPath, dirName; 213 214 // hts_set_log_level(htsLogLevel.HTS_LOG_INFO); 215 // hts_log_info(__FUNCTION__, "Testing TabixIndexedFile"); 216 // hts_log_info(__FUNCTION__, "Loading test file"); 217 // auto vcf = VCFReader(buildPath(dirName(dirName(dirName(__FILE__))),"htslib","test","index.vcf")); 218 // auto vcfw = VCFWriter() 219 220 // } 221 222 /** 223 Range that allows reading a record based format via tabix. 224 Needs a record type that encompasses only one line of text 225 and a ChromInterval region to use for tabix filtering. 226 Rectype could be GFF3Record, BedRecord ... 227 This is a sister struct to dhtslib.bgzf.RecordReader. 228 */ 229 struct RecordReaderRegion(RecType, CoordSystem cs) 230 { 231 /// file reader 232 TabixIndexedFile file; 233 /// file reader range 234 ReturnType!(this.initializeRange) range; 235 /// chrom of region 236 string chrom; 237 /// coordinates of region 238 Interval!cs coords; 239 /// keep the header 240 string header; 241 242 bool emptyLine = false; 243 244 /// string chrom and Interval ctor 245 this(string fn, string chrom, Interval!cs coords, int extra_threads = -1, string fnIdx = "") 246 { 247 this.file = TabixIndexedFile(fn, extra_threads, fnIdx); 248 this.chrom = chrom; 249 this.coords = coords; 250 this.header = this.file.header; 251 this.range = this.initializeRange; 252 this.range.empty; 253 } 254 255 /// copy the TabixIndexedFile.region range 256 auto initializeRange() 257 { 258 return this.file.region(this.chrom, this.coords); 259 } 260 261 /// returns RecType 262 RecType front() 263 { 264 return RecType(this.range.front); 265 } 266 267 /// move the range 268 void popFront() 269 { 270 this.range.popFront; 271 if(this.range.front == "") this.emptyLine = true; 272 } 273 274 /// is the range done 275 auto empty() 276 { 277 return this.emptyLine || this.range.empty; 278 } 279 280 typeof(this) save() 281 { 282 typeof(this) newRange = this; 283 newRange.range = this.range.save; 284 return newRange; 285 } 286 }