BlobSpliterator.java (5511B)
1 package osm.protobuf; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 import java.io.RandomAccessFile; 6 import java.util.ArrayList; 7 import java.util.List; 8 import java.util.Spliterator; 9 import java.util.concurrent.locks.Lock; 10 import java.util.concurrent.locks.ReentrantLock; 11 import java.util.function.Consumer; 12 import java.util.stream.Stream; 13 import java.util.stream.StreamSupport; 14 15 import osm.common.RandomAccessFileInputStream; 16 import osm.message.Blob; 17 import osm.message.BlobHeader; 18 import osm.message.Entity; 19 import osm.message.HeaderBlock; 20 21 /** 22 * A {@link Spliterator} implementation for iterating over lists of 23 * {@link Entity} objects parsed from OpenStreetMap blobs. 24 */ 25 public class BlobSpliterator implements Spliterator<List<Entity>> { 26 /** 27 * BlobHeader is never bigger than 64K. 28 */ 29 private static final int MAX_HEADER_SIZE = 64 * 1024; 30 31 /** 32 * Blob is never bigger than 32M. 33 */ 34 private static final int MAX_BLOB_SIZE = 32 * 1024 * 1024; 35 36 private final RandomAccessFile input; 37 private final InputStream stream; 38 private final Lock lock; 39 private final Consumer<HeaderBlock> onHeader; 40 private BlobHeader[] headers; 41 42 private final int start; 43 private int end = 0; 44 private int current = 0; 45 46 /** 47 * Constructs a BlobSpliterator with the specified parameters. 48 * 49 * @param input The {@link RandomAccessFile} used for reading blobs. 50 * @param onHeader A {@link Consumer} to handle the parsed {@link HeaderBlock} 51 * objects. 52 */ 53 public BlobSpliterator(RandomAccessFile input, Consumer<HeaderBlock> onHeader) { 54 this.input = input; 55 this.onHeader = onHeader; 56 this.stream = new RandomAccessFileInputStream(input); 57 this.lock = new ReentrantLock(); 58 59 List<BlobHeader> headerList = new ArrayList<>(); 60 try { 61 while (input.getFilePointer() < input.length()) { 62 int headerLength = input.readInt(); 63 64 if (headerLength > MAX_HEADER_SIZE) 65 throw new RuntimeException( 66 "blob header exceeds " + MAX_HEADER_SIZE + " bytes (" + headerLength + ")"); 67 68 BlobHeader header = new BlobHeader().parse(stream, headerLength); 69 70 if (header.size > MAX_BLOB_SIZE) 71 throw new RuntimeException("blob exceeds " + MAX_BLOB_SIZE + " bytes (" + header.size + ")"); 72 73 header.offset = input.getFilePointer(); 74 headerList.add(header); 75 76 input.skipBytes(header.size); 77 } 78 } catch (IOException e) { 79 throw new RuntimeException(e); 80 } 81 82 headers = new BlobHeader[headerList.size()]; 83 headerList.toArray(headers); 84 85 start = 0; 86 current = 0; 87 end = headers.length; 88 } 89 90 private BlobSpliterator(RandomAccessFile input, Lock lock, Consumer<HeaderBlock> onHeader, BlobHeader[] headers, 91 int start, int end) { 92 this.input = input; 93 this.lock = lock; 94 this.onHeader = onHeader; 95 this.stream = new RandomAccessFileInputStream(input); 96 this.headers = headers; 97 this.start = start; 98 this.current = start; 99 this.end = end; 100 } 101 102 @Override 103 public Spliterator<List<Entity>> trySplit() { 104 // locking because of end 105 lock.lock(); 106 if (current < end - 1) 107 return null; 108 109 int mid = (end - start) / 2; 110 int otherEnd = end; 111 112 end = start + mid; 113 lock.unlock(); 114 115 return new BlobSpliterator(input, lock, onHeader, headers, end, otherEnd); 116 } 117 118 @Override 119 public boolean tryAdvance(Consumer<? super List<Entity>> action) { 120 // locking because of input, end 121 lock.lock(); 122 123 if (current >= end) 124 return false; 125 126 BlobHeader header = headers[current++]; 127 128 try { 129 input.seek(header.offset); 130 } catch (IOException e) { 131 throw new RuntimeException(e); 132 } 133 134 Blob blob = new Blob(header.headerType).parse(stream, header.size); 135 136 // unlocking, every critical variable is unused 137 lock.unlock(); 138 139 if (blob.header != null) 140 onHeader.accept(blob.header); 141 142 if (blob.primitive != null) 143 action.accept(blob.primitive); 144 145 return true; 146 } 147 148 @Override 149 public long estimateSize() { 150 return end - current; 151 } 152 153 @Override 154 public int characteristics() { 155 return DISTINCT | SUBSIZED | SIZED | NONNULL | IMMUTABLE | CONCURRENT; 156 } 157 158 /** 159 * Creates a sequential or parallel {@link Stream} from this 160 * {@link BlobSpliterator}. 161 * 162 * @param parallel A boolean indicating whether the stream should be parallel. 163 * @return A {@link Stream} of lists of {@link Entity} objects. 164 */ 165 public Stream<List<Entity>> stream(boolean parallel) { 166 return StreamSupport.stream(this, parallel); 167 } 168 169 /** 170 * Creates a sequential stream from this {@link BlobSpliterator}. 171 * 172 * @return A sequential {@link Stream} of lists of {@link Entity} objects. 173 */ 174 public Stream<List<Entity>> stream() { 175 return stream(false); 176 } 177 178 /** 179 * Creates a parallel stream from this {@link BlobSpliterator}. 180 * 181 * @return A parallel {@link Stream} of lists of {@link Entity} objects. 182 */ 183 public Stream<List<Entity>> parallelStream() { 184 return stream(true); 185 } 186 }