1 /**
2 Module providing a wrapper, `TabixIndexedFile` over a line-oriented NGS flat file,
3 such as BED, GFF3, VCF that has been indexed with tabix.
4 
5 The wrapper provides a list of reference sequence names, as well as iterator over
6 all rows falling within a sequence range, e.g. "chr1:1000-2000"
7 */
8 
9 module dhtslib.tabix;
10 
11 import std.stdio;
12 import std.string;
13 import std.traits : ReturnType;
14 import std.range : inputRangeObject, InputRangeObject;
15 import std.parallelism : totalCPUs;
16 import core.stdc.stdlib : malloc, free;
17 
18 import htslib.hts;
19 import htslib.hts_log;
20 import htslib.kstring;
21 import htslib.tbx;
22 import dhtslib.coordinates;
23 import dhtslib.memory;
24 import dhtslib.util;
25 //import htslib.regidx;
26 
27 /** Encapsulates a position-sorted record-oriented NGS flat file,
28  *  indexed with Tabix, including BED, GFF3, VCF.
29  *
30  *  region(string r) returns an InputRange that iterates through rows of the file intersecting or contained within the requested range 
31  */
32 struct TabixIndexedFile {
33 
34     HtsFile fp;    /// rc wrapper around htsFile ptr
35     Tbx   tbx;   /// rc wrapper around tabix ptr
36 
37     string header;  /// NGS flat file's header (if any; e.g. BED may not have one)
38 
39     /// Initialize with a complete file path name to the tabix-indexed file
40     /// The tabix index (.tbi) must already exist alongside
41     this(const(char)[] fn, int extra_threads = -1, const(char)[] fntbi = "")
42     {
43         debug(dhtslib_debug) { writeln("TabixIndexedFile ctor"); }
44         this.fp = HtsFile(hts_open( toStringz(fn), "r"));
45         if ( !this.fp ) {
46             writefln("Could not read %s\n", fn);
47             throw new Exception("Couldn't read file");
48         }
49 
50         if (extra_threads == -1)
51         {
52             if ( totalCPUs > 1)
53             {
54                 hts_log_info(__FUNCTION__,
55                         format("%d CPU cores detected; enabling multithreading", totalCPUs));
56                 // hts_set_threads adds N _EXTRA_ threads, so totalCPUs - 1 seemed reasonable,
57                 // but overcomitting by 1 thread (i.e., passing totalCPUs) buys an extra 3% on my 2-core 2013 Mac
58                 hts_set_threads(this.fp, totalCPUs);
59             }
60         } else if (extra_threads > 0)
61         {
62             if ((extra_threads + 1) > totalCPUs)
63                 hts_log_warning(__FUNCTION__, "More threads requested than CPU cores detected");
64             hts_set_threads(this.fp, extra_threads);
65         }
66         else if (extra_threads == 0)
67         {
68             hts_log_debug(__FUNCTION__, "Zero extra threads requested");
69         }
70 
71         //enum htsExactFormat format = hts_get_format(fp)->format;
72         if(fntbi!="") this.tbx = Tbx(tbx_index_load2( toStringz(fn), toStringz(fntbi) ));
73         else this.tbx = Tbx(tbx_index_load( toStringz(fn) ));
74         if (!this.tbx) { 
75             writefln("Could not load .tbi index of %s\n", fn );
76             throw new Exception("Couldn't load tabix index file");
77         }
78 
79         loadHeader();
80     }
81 
82     /// Explicit postblit to avoid 
83     /// https://github.com/blachlylab/dhtslib/issues/122
84     this(this)
85     {
86         this.fp = fp;
87         this.tbx = tbx;
88         this.header = header;
89     }
90 
91     private void loadHeader()
92     {
93         kstring_t str;
94         scope(exit) { free(str.s); }
95 
96         while ( hts_getline(this.fp, '\n', &str) >= 0 )
97         {
98             if ( !str.l || str.s[0] != this.tbx.conf.meta_char ) break;
99             this.header ~= fromStringz(str.s) ~ '\n';
100         }
101 
102         debug(dhtslib_debug) { writeln("end loadHeader"); }
103     }
104 
105     /// tbx.d: const(char **) tbx_seqnames(tbx_t *tbx, int *n);  // free the array but not the values
106     @property string[] sequenceNames()
107     {
108         // TODO: Check for memory leaks (free the array after copy into sequence_names)
109         int nseqs;
110 
111         string[] sequence_names;
112 
113         const(char **) seqnames = tbx_seqnames(this.tbx, &nseqs);
114 
115         for(int i; i<nseqs; i++) {
116             sequence_names ~= cast(string) fromStringz(seqnames[i]);
117         }
118 
119         return sequence_names;
120     }
121 
122     /** region(r)
123      *  returns an InputRange that iterates through rows of the file intersecting or contained within the requested range 
124      */
125     auto region(CoordSystem cs)(string chrom, Interval!cs coords)
126     {
127         auto newCoords = coords.to!(CoordSystem.zbho);
128         struct Region
129         {
130 
131             /** TODO: determine how thread-(un)safe this is (i.e., using a potentially shared *fp and *tbx) */
132             private HtsFile fp;
133             private Tbx tbx;
134 
135             private HtsItr itr;
136             private string next;
137 
138             // necessary because the alternative strategy of preloading the first row
139             // leads to problems when Struct inst is blitted ->
140             // re-iterating always returns first row only (since *itr is expended 
141             // but first row was preloaded in this.next)
142             private bool active;
143             private string chrom;
144 
145             this(HtsFile fp, Tbx tbx,  string chrom, Interval!(CoordSystem.zbho) coords)
146             {
147                 this.fp = fp;
148                 this.tbx = tbx;
149                 this.chrom = chrom;
150                 this.itr = HtsItr(tbx_itr_queryi(tbx, tbx_name2id(tbx, toStringz(this.chrom)), coords.start, coords.end));
151                 this.empty;
152                 debug(dhtslib_debug) { writeln("Region ctor // this.itr: ", this.itr); }
153             }
154 
155             // had to remove "const" property from empty() due to manipulation of this.active
156             @property bool empty() {
157 
158                 if (!this.active) {
159                     // this is the first call to empty() (and use of the range)
160                     // Let's make it active and attempt to load the first record, if one exists
161                     this.active = true;
162                     this.popFront();
163                 }
164 
165                 if (!this.next) return true;
166                 else return false;
167             }
168 
169             @property string front() const {
170                 return this.next;
171             }
172 
173             void popFront() {
174                 // closure over fp and tbx? (i.e. potentially unsafe?)
175 
176                 // Get next entry
177                 kstring_t kstr;
178                 immutable res = tbx_itr_next(this.fp, this.tbx, this.itr, &kstr);
179                 if (res < 0) {
180                     // we are done
181                     this.next = null;
182                 } else {
183                     // Otherwise load into next
184                     this.next = fromStringz(kstr.s).idup;
185                     free(kstr.s);
186                 }
187             }
188             Region save()
189             {
190                 Region newRange;
191                 newRange.fp = HtsFile(copyHtsFile(fp));
192                 newRange.itr = HtsItr(copyHtsItr(itr));
193                 newRange.tbx = tbx;
194                 newRange.next = next;
195                 newRange.active = active;
196                 newRange.chrom = chrom;
197                 return newRange;
198             }
199         }
200 
201         return Region(this.fp, this.tbx, chrom, newCoords);
202     }
203 
204 }
205 
206 // TODO: figure out how to make this unittest with just htslib files
207 
208 // debug(dhtslib_unittest) unittest
209 // {
210 //     import htslib.hts_log;
211 //     import dhtslib.vcf;
212 //     import std.path : buildPath, dirName;
213 
214 //     hts_set_log_level(htsLogLevel.HTS_LOG_INFO);
215 //     hts_log_info(__FUNCTION__, "Testing TabixIndexedFile");
216 //     hts_log_info(__FUNCTION__, "Loading test file");
217 //     auto vcf = VCFReader(buildPath(dirName(dirName(dirName(__FILE__))),"htslib","test","index.vcf"));
218 //     auto vcfw = VCFWriter()
219 
220 // }
221 
222 /**
223     Range that allows reading a record based format via tabix.
224     Needs a record type that encompasses only one line of text
225     and a ChromInterval region to use for tabix filtering.
226     Rectype could be GFF3Record, BedRecord ...
227     This is a sister struct to dhtslib.bgzf.RecordReader.
228 */
229 struct RecordReaderRegion(RecType, CoordSystem cs)
230 {
231     /// file reader
232     TabixIndexedFile file;
233     /// file reader range
234     ReturnType!(this.initializeRange) range;
235     /// chrom of region
236     string chrom;
237     /// coordinates of region
238     Interval!cs coords;
239     /// keep the header
240     string header;
241     
242     bool emptyLine = false;
243     
244     /// string chrom and Interval ctor
245     this(string fn, string chrom, Interval!cs coords, int extra_threads = -1, string fnIdx = "")
246     {
247         this.file = TabixIndexedFile(fn, extra_threads, fnIdx);
248         this.chrom = chrom;
249         this.coords = coords;
250         this.header = this.file.header;
251         this.range = this.initializeRange;
252         this.range.empty;
253     }
254 
255     /// copy the TabixIndexedFile.region range
256     auto initializeRange()
257     {
258         return this.file.region(this.chrom, this.coords);
259     }
260 
261     /// returns RecType
262     RecType front()
263     {
264         return RecType(this.range.front);
265     }
266 
267     /// move the range
268     void popFront()
269     {
270         this.range.popFront;
271         if(this.range.front == "") this.emptyLine = true;
272     }
273 
274     /// is the range done
275     auto empty()
276     {
277         return this.emptyLine || this.range.empty;
278     }
279 
280     typeof(this) save()
281     {
282         typeof(this) newRange = this;
283         newRange.range = this.range.save;
284         return newRange;
285     }
286 }