Leer un archivo VCF más rápido con java 8, htsjdk y java.util.stream.Stream

corrientes de Java 8 “Admite operaciones de estilo funcional en flujos de elementos, como transformaciones de reducción de mapas en colecciones”. En esta publicación, mostraré cómo he implementado un java.util.stream.Stream de variantes de VCF que cuenta el número de elementos en dbsnp.

Este ejemplo utiliza el API java htsjdk para leer variantes.

Cuando se usan flujos paralelos, la idea principal es implementar un java.util.Spliterator eso dividirá el diccionario de secuencias (el genoma) en un máximo de N (aquí N=5) partes. Cada parte contará el número de variantes en el genoma 1/N en su propio hilo. Como estamos usando un VCF indexado por tribble, es fácil comenzar a contar en una posición determinada del genoma.

ContigPos

la clase ContigPos define un cromosoma y una posición en todo el genoma.

class ContigPos 

contiene una función para convertir su posición en un índice en todo el genoma usando el diccionario del genoma (htsjdk.samtools.SAMSequenceDictionary) .

long genomicIndex() 

VariantContextSpliterator

VariantContextSpliterator es la clase principal. Divide el archivo VCF en partes e implementa Separador .

public class VariantContextSpliterator
    implements Closeable,Spliterator<VariantContext> {
(...)

VariantContextSpliterator contiene el diccionario de secuencias y la ruta al archivo VCF indexado

/** current VCF File reader */
private VCFFileReader vcfFileReader = null;
/** genome dictionary */
private final SAMSequenceDictionary dict ;

Cada VariantContextSpliterator tiene es propio privado VCFileReader y iterador cerrable. Ambos deben cerrarse cuando no haya más variantes para leer.

/** current VCF File reader */
private VCFFileReader vcfFileReader = null;
/** current variant iterator */
private CloseableIterator<VariantContext> variantIter = null;

Cada VariantContextSpliterator tiene una región genómica dedicada.

/* region start */
private ContigPos begin;
/** region end */
private ContigPos end ;

el primero VariantContextSpliterator escaneará:

  • de begin = new ContigPos("chr1",0)
  • a end = new ContigPos("chrY",(size_of_chrY))

No queremos abrir muchos subprocesos, por lo que estamos rastreando la cantidad de iteradores abiertos en un AtomicInteger

AtomicInteger nsplits

VariantContextSpliterator.peek()

VariantContextSpliterator.peek() es un método que mira a escondidas la siguiente variante en el intervalo genómico.

Abrimos el VCCFileReader si nunca se abrió, se incrementa el número de archivos abiertos.

/** VCF reader was never opened */
if( this.vcfFileReader == null )  

Si bien no hay más variantes disponibles en este cromosoma, abra el siguiente cromosoma para leer:

while(!this.variantIter.hasNext()) 

obtenga la siguiente variante, actualice ‘comenzar’ con esta variante. Cerramos el VCFfileReader si hemos llegado al final de la ventana genómica.

/* get the next variant */
final VariantContext ctx = this.variantIter.next();
/* update 'begin' */
this.begin= new ContigPos(ctx.getContig(), ctx.getStart());

/** close if the end of the genomic location was reached */
if((this.begin.tid > this.end.tid) ||
   (this.begin.tid == this.end.tid && this.begin.pos >= this.end.pos) ) 
this._peeked = ctx;
return this._peeked;

VariantContextSpliterator.tryAdvance()

Si existe una variante restante, realiza la acción dada en ella, devolviendo verdadero; de lo contrario devuelve falso.

@Override
public boolean tryAdvance(Consumer<? super VariantContext> action) 

VariantContextSpliterator.trySplit()

intentar dividir devuelve un VariantContextSpliterator que cubre elementos que, al regresar de este método, no estarán cubiertos por este VariantContextSpliterator. Podemos dividir si el tamaño de la ventana restante es superior a 1 Mb y si el número de VCFReaderFile abiertos es inferior a 10.

public Spliterator<VariantContext> trySplit() 

Pruebas

para obtener un flujo, usamos la función estática java.util.stream.StreamSupport.stream se llama.

stream() Crea un nuevo Stream secuencial o paralelo a partir de un Spliterator. El spliterator solo se atraviesa, se divide o se consulta por el tamaño estimado después de que comienza la operación terminal de la tubería de flujo.

private Stream<VariantContext> stream(boolean parallel) 

Contamos el número de variantes en dbSNP. Imprimimos la duración para arroyo(), flujoParalelo() y un iterador estándar.

final File vcFile =new File(args[0]);
StreameableVcfFile test= new StreameableVcfFile(vcFile);
long start1 = System.currentTimeMillis();
System.out.println("count;"+test.parallelStream().count());
long end1 = System.currentTimeMillis();
System.out.println(" parallelstream: " + ((end1 - start1) / 1000));



long start2 = System.currentTimeMillis();
System.out.println("count;"+test.stream().count());
long end2 = System.currentTimeMillis();
System.out.println("stream : " + ((end2 - start2) / 1000));


long start3 = System.currentTimeMillis();
CloseableIterator<VariantContext>  r= new VCFFileReader(vcFile).iterator();
int n=0;
while(r.hasNext()) 
r.close();
long end3 = System.currentTimeMillis();
 System.out.println("count;"+n);
System.out.println("iter : " + ((end3 - start3) / 1000));
count: 61045456 snps
parallelstream: 80 seconds

count: 61045456 snps
stream : 365 seconds

count: 61045456 snps
iter : 355 seconds

Eso es todo,

Pedro

Código fuente

Fuente del artículo

Deja un comentario