MessageIterator.java (3028B)
1 package protobuf; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 import java.util.ArrayList; 6 import java.util.Iterator; 7 import java.util.List; 8 9 import protobuf.exception.InputException; 10 import protobuf.exception.OverflowException; 11 12 /** 13 * Iterator for parsing Protocol Buffers from an input stream. 14 * 15 * <p> 16 * The iterator reads and parses wire streams from the provided input stream and 17 * produces {@link ProtobufReader} instances, representing the parsed 18 * messages. 19 * 20 * @see ProtobufReader 21 */ 22 public class MessageIterator implements Iterator<ProtobufReader> { 23 /** 24 * The input stream from which the Protocol Buffers are read. 25 */ 26 protected final InputStream input; 27 28 /** 29 * The remaining length of the input stream. It is decremented as messages are 30 * read. 31 */ 32 protected int length; 33 34 /** 35 * A list of delayed operations to be executed when there are no more messages 36 * to read. 37 */ 38 protected List<Runnable> delayed = new ArrayList<>(); 39 40 /** 41 * Constructs a new MessageIterator with the given input stream and length. 42 * 43 * @param input The input stream containing the Protocol Buffers. 44 * @param length The initial length of the input stream. 45 */ 46 public MessageIterator(InputStream input, int length) { 47 this.input = input; 48 this.length = length; 49 } 50 51 /** 52 * Reads a variable-length integer (varint) from the input stream. 53 * 54 * @return The parsed varint value. 55 * @throws OverflowException If the input exceeds the expected size. 56 * @throws InputException If an I/O error occurs during reading. 57 */ 58 private int varint() { 59 int result = 0; 60 int b = 0; 61 int shift = 0; 62 while (shift < 32 && length > 0) { 63 try { 64 b = input.read(); 65 } catch (IOException exc) { 66 throw new InputException(exc); 67 } 68 if (b == -1) 69 break; 70 length--; 71 72 result |= (b & 0x7f) << shift; 73 shift += 7; 74 if ((b & 0x80) == 0) 75 return result; 76 } 77 throw new OverflowException("input exceed"); 78 } 79 80 /** 81 * Checks if there are more Protocol Buffers messages to be read. 82 * 83 * @return {@code true} if there are more messages; {@code false} otherwise. 84 */ 85 @Override 86 public boolean hasNext() { 87 if (length == 0) { 88 delayed.forEach(Runnable::run); 89 delayed.clear(); 90 } 91 return length > 0; 92 } 93 94 /** 95 * Reads the next wire stream and creates a {@link ProtobufReader} 96 * instance. 97 * 98 * @return The parsed {@link ProtobufReader} instance. 99 */ 100 @Override 101 public ProtobufReader next() { 102 int tag = varint(); 103 104 // least-significant 3 bits are the type, remaining is the tag value 105 // type = tag & 0b00000111 106 return new ProtobufReader(this, WireType.values()[tag & 0x07], tag >> 3); 107 } 108 }