persolijn

an efficient router for busses
Log | Files | Refs

BlobSpliterator.java (5511B)


      1 package osm.protobuf;
      2 
      3 import java.io.IOException;
      4 import java.io.InputStream;
      5 import java.io.RandomAccessFile;
      6 import java.util.ArrayList;
      7 import java.util.List;
      8 import java.util.Spliterator;
      9 import java.util.concurrent.locks.Lock;
     10 import java.util.concurrent.locks.ReentrantLock;
     11 import java.util.function.Consumer;
     12 import java.util.stream.Stream;
     13 import java.util.stream.StreamSupport;
     14 
     15 import osm.common.RandomAccessFileInputStream;
     16 import osm.message.Blob;
     17 import osm.message.BlobHeader;
     18 import osm.message.Entity;
     19 import osm.message.HeaderBlock;
     20 
     21 /**
     22  * A {@link Spliterator} implementation for iterating over lists of
     23  * {@link Entity} objects parsed from OpenStreetMap blobs.
     24  */
     25 public class BlobSpliterator implements Spliterator<List<Entity>> {
     26     /**
     27      * BlobHeader is never bigger than 64K.
     28      */
     29     private static final int MAX_HEADER_SIZE = 64 * 1024;
     30 
     31     /**
     32      * Blob is never bigger than 32M.
     33      */
     34     private static final int MAX_BLOB_SIZE = 32 * 1024 * 1024;
     35 
     36     private final RandomAccessFile input;
     37     private final InputStream stream;
     38     private final Lock lock;
     39     private final Consumer<HeaderBlock> onHeader;
     40     private BlobHeader[] headers;
     41 
     42     private final int start;
     43     private int end = 0;
     44     private int current = 0;
     45 
     46     /**
     47      * Constructs a BlobSpliterator with the specified parameters.
     48      *
     49      * @param input    The {@link RandomAccessFile} used for reading blobs.
     50      * @param onHeader A {@link Consumer} to handle the parsed {@link HeaderBlock}
     51      *                 objects.
     52      */
     53     public BlobSpliterator(RandomAccessFile input, Consumer<HeaderBlock> onHeader) {
     54         this.input = input;
     55         this.onHeader = onHeader;
     56         this.stream = new RandomAccessFileInputStream(input);
     57         this.lock = new ReentrantLock();
     58 
     59         List<BlobHeader> headerList = new ArrayList<>();
     60         try {
     61             while (input.getFilePointer() < input.length()) {
     62                 int headerLength = input.readInt();
     63 
     64                 if (headerLength > MAX_HEADER_SIZE)
     65                     throw new RuntimeException(
     66                             "blob header exceeds " + MAX_HEADER_SIZE + " bytes (" + headerLength + ")");
     67 
     68                 BlobHeader header = new BlobHeader().parse(stream, headerLength);
     69 
     70                 if (header.size > MAX_BLOB_SIZE)
     71                     throw new RuntimeException("blob exceeds " + MAX_BLOB_SIZE + " bytes (" + header.size + ")");
     72 
     73                 header.offset = input.getFilePointer();
     74                 headerList.add(header);
     75 
     76                 input.skipBytes(header.size);
     77             }
     78         } catch (IOException e) {
     79             throw new RuntimeException(e);
     80         }
     81 
     82         headers = new BlobHeader[headerList.size()];
     83         headerList.toArray(headers);
     84 
     85         start = 0;
     86         current = 0;
     87         end = headers.length;
     88     }
     89 
     90     private BlobSpliterator(RandomAccessFile input, Lock lock, Consumer<HeaderBlock> onHeader, BlobHeader[] headers,
     91             int start, int end) {
     92         this.input = input;
     93         this.lock = lock;
     94         this.onHeader = onHeader;
     95         this.stream = new RandomAccessFileInputStream(input);
     96         this.headers = headers;
     97         this.start = start;
     98         this.current = start;
     99         this.end = end;
    100     }
    101 
    102     @Override
    103     public Spliterator<List<Entity>> trySplit() {
    104         // locking because of end
    105         lock.lock();
    106         if (current < end - 1)
    107             return null;
    108 
    109         int mid = (end - start) / 2;
    110         int otherEnd = end;
    111 
    112         end = start + mid;
    113         lock.unlock();
    114 
    115         return new BlobSpliterator(input, lock, onHeader, headers, end, otherEnd);
    116     }
    117 
    118     @Override
    119     public boolean tryAdvance(Consumer<? super List<Entity>> action) {
    120         // locking because of input, end
    121         lock.lock();
    122 
    123         if (current >= end)
    124             return false;
    125 
    126         BlobHeader header = headers[current++];
    127 
    128         try {
    129             input.seek(header.offset);
    130         } catch (IOException e) {
    131             throw new RuntimeException(e);
    132         }
    133 
    134         Blob blob = new Blob(header.headerType).parse(stream, header.size);
    135 
    136         // unlocking, every critical variable is unused
    137         lock.unlock();
    138 
    139         if (blob.header != null)
    140             onHeader.accept(blob.header);
    141 
    142         if (blob.primitive != null)
    143             action.accept(blob.primitive);
    144 
    145         return true;
    146     }
    147 
    148     @Override
    149     public long estimateSize() {
    150         return end - current;
    151     }
    152 
    153     @Override
    154     public int characteristics() {
    155         return DISTINCT | SUBSIZED | SIZED | NONNULL | IMMUTABLE | CONCURRENT;
    156     }
    157 
    158     /**
    159      * Creates a sequential or parallel {@link Stream} from this
    160      * {@link BlobSpliterator}.
    161      *
    162      * @param parallel A boolean indicating whether the stream should be parallel.
    163      * @return A {@link Stream} of lists of {@link Entity} objects.
    164      */
    165     public Stream<List<Entity>> stream(boolean parallel) {
    166         return StreamSupport.stream(this, parallel);
    167     }
    168 
    169     /**
    170      * Creates a sequential stream from this {@link BlobSpliterator}.
    171      *
    172      * @return A sequential {@link Stream} of lists of {@link Entity} objects.
    173      */
    174     public Stream<List<Entity>> stream() {
    175         return stream(false);
    176     }
    177 
    178     /**
    179      * Creates a parallel stream from this {@link BlobSpliterator}.
    180      *
    181      * @return A parallel {@link Stream} of lists of {@link Entity} objects.
    182      */
    183     public Stream<List<Entity>> parallelStream() {
    184         return stream(true);
    185     }
    186 }