corrientes de Java 8 “Admite operaciones de estilo funcional en flujos de elementos, como transformaciones de reducción de mapas en colecciones”. En esta publicación, mostraré cómo he implementado un java.util.stream.Stream de variantes de VCF que cuenta el número de elementos en dbsnp.
Este ejemplo utiliza el API java htsjdk para leer variantes.
Cuando se usan flujos paralelos, la idea principal es implementar un java.util.Spliterator
ContigPos
la clase ContigPos define un cromosoma y una posición en todo el genoma.
class ContigPos
contiene una función para convertir su posición en un índice en todo el genoma usando el diccionario del genoma (htsjdk.samtools.SAMSequenceDictionary) .
long genomicIndex()
VariantContextSpliterator
VariantContextSpliterator es la clase principal. Divide el archivo VCF en partes e implementa Separador
public class VariantContextSpliterator
implements Closeable,Spliterator<VariantContext> {
(...)
VariantContextSpliterator contiene el diccionario de secuencias y la ruta al archivo VCF indexado
/** current VCF File reader */
private VCFFileReader vcfFileReader = null;
/** genome dictionary */
private final SAMSequenceDictionary dict ;
Cada VariantContextSpliterator tiene es propio privado VCFileReader y iterador cerrable
/** current VCF File reader */
private VCFFileReader vcfFileReader = null;
/** current variant iterator */
private CloseableIterator<VariantContext> variantIter = null;
Cada VariantContextSpliterator tiene una región genómica dedicada.
/* region start */
private ContigPos begin;
/** region end */
private ContigPos end ;
el primero VariantContextSpliterator escaneará:
- de
begin = new ContigPos("chr1",0)
- a
end = new ContigPos("chrY",(size_of_chrY))
No queremos abrir muchos subprocesos, por lo que estamos rastreando la cantidad de iteradores abiertos en un AtomicInteger
AtomicInteger nsplits
VariantContextSpliterator.peek()
VariantContextSpliterator.peek() es un método que mira a escondidas la siguiente variante en el intervalo genómico.
Abrimos el VCCFileReader si nunca se abrió, se incrementa el número de archivos abiertos.
/** VCF reader was never opened */
if( this.vcfFileReader == null )
Si bien no hay más variantes disponibles en este cromosoma, abra el siguiente cromosoma para leer:
while(!this.variantIter.hasNext())
obtenga la siguiente variante, actualice ‘comenzar’ con esta variante. Cerramos el VCFfileReader si hemos llegado al final de la ventana genómica.
/* get the next variant */
final VariantContext ctx = this.variantIter.next();
/* update 'begin' */
this.begin= new ContigPos(ctx.getContig(), ctx.getStart());
/** close if the end of the genomic location was reached */
if((this.begin.tid > this.end.tid) ||
(this.begin.tid == this.end.tid && this.begin.pos >= this.end.pos) )
this._peeked = ctx;
return this._peeked;
VariantContextSpliterator.tryAdvance()
Si existe una variante restante, realiza la acción dada en ella, devolviendo verdadero; de lo contrario devuelve falso.
@Override
public boolean tryAdvance(Consumer<? super VariantContext> action)
VariantContextSpliterator.trySplit()
intentar dividir devuelve un VariantContextSpliterator que cubre elementos que, al regresar de este método, no estarán cubiertos por este VariantContextSpliterator. Podemos dividir si el tamaño de la ventana restante es superior a 1 Mb y si el número de VCFReaderFile abiertos es inferior a 10.
public Spliterator<VariantContext> trySplit()
Pruebas
para obtener un flujo, usamos la función estática java.util.stream.StreamSupport.stream
se llama.
stream() Crea un nuevo Stream secuencial o paralelo a partir de un Spliterator. El spliterator solo se atraviesa, se divide o se consulta por el tamaño estimado después de que comienza la operación terminal de la tubería de flujo.
private Stream<VariantContext> stream(boolean parallel)
Contamos el número de variantes en dbSNP. Imprimimos la duración para arroyo(), flujoParalelo() y un iterador estándar.
final File vcFile =new File(args[0]);
StreameableVcfFile test= new StreameableVcfFile(vcFile);
long start1 = System.currentTimeMillis();
System.out.println("count;"+test.parallelStream().count());
long end1 = System.currentTimeMillis();
System.out.println(" parallelstream: " + ((end1 - start1) / 1000));
long start2 = System.currentTimeMillis();
System.out.println("count;"+test.stream().count());
long end2 = System.currentTimeMillis();
System.out.println("stream : " + ((end2 - start2) / 1000));
long start3 = System.currentTimeMillis();
CloseableIterator<VariantContext> r= new VCFFileReader(vcFile).iterator();
int n=0;
while(r.hasNext())
r.close();
long end3 = System.currentTimeMillis();
System.out.println("count;"+n);
System.out.println("iter : " + ((end3 - start3) / 1000));
count: 61045456 snps
parallelstream: 80 seconds
count: 61045456 snps
stream : 365 seconds
count: 61045456 snps
iter : 355 seconds
Eso es todo,
Pedro