Hello.
I'm new to Vavr and I'm really impressed by how pretty the Java code can be made by using the lib.
I faced an issue during processing huge text files (few gigabytes). While reading lines and transforming them (some simple String operations) I get java.lang.OutOfMemoryError: Java heap space even though I used -Xmx6g.
Plain Java stream does the job easily with much less memory available.
I'm wondering where's the issue. Does it happen because of Vavr streams being reusable? Or is there a more appropriate way to read and process files?
I paste the exemplary code below. I would appreciate any comments.
Cheers!
Some(Paths.get(PATH_TO_A_HUGE_TEXT_FILE))
.map(path ->
Try(() -> Files.lines(path)).map(Stream::ofAll).get()
)
.map(linesStream -> linesStream.map(String::toUpperCase))
.forEach(transformedStream -> transformedStream.forEach(s -> { }));
java.nio.file.Files.lines(java.nio.file.Path) return java.util.stream.Stream<String> which process lines in lazy matter: one-by-one, forget-about-old
But Vavr's Stream is lazy stream with memory: it get items from source stream one-by-one, but it remember all values.
If you don't need old values you can use Vavr's Iterator:
Option
.of(PATH_TO_A_HUGE_TEXT_FILE)
.map(Paths::get)
.toTry()
.mapTry(Files::lines)
.map(BaseStream::iterator)
.map(Iterator::ofAll)
.map(lines -> lines.map(String::toUpperCase))
.forEach(transformed -> transformed.forEach(line -> {
}));
P.S.
I don't run this code on large file yet.
I write short test code for compare performance of Java Stream, Vavr Stream and Vavr iterator.
enum Sizes {
Bytes(1),
KiB(1024),
MiB(1024 * 1024),
GiB(1024 * 1024 * 1024),
;
private final long value;
Sizes(long value) {
this.value = value;
}
public static String toString(long value) {
Sizes size = Bytes;
for (Sizes s : Sizes.values()) {
if (s.value <= value) {
size = s;
}
}
return String.format(Locale.US, "%.2f %s", (double) value / size.value, size.name());
}
public static void printMemoryInfo() {
Runtime runtime = Runtime.getRuntime();
System.out.println(String.format(
"Memory[Total: %s; Free: %s; Max: %s; Used: %s]"
, toString(runtime.totalMemory())
, toString(runtime.freeMemory())
, toString(runtime.maxMemory())
, toString(runtime.totalMemory() - runtime.freeMemory())
));
}
}
private Try<java.util.stream.Stream<String>> readLineOfHugeTextFile(int maxLineLength, long streamSize) {
String source = "qwertyuiopasdfghjklzxcvbnm QWERTYUIOPASDFGHJKLZXCVBNM";
int len = source.length();
Random random = new Random();
Stream<String> stream = Stream
.generate(() -> {
char[] chars = new char[random.nextInt(maxLineLength)];
for (int i = 0; i < chars.length; i++) {
chars[i] = source.charAt(random.nextInt(len));
}
return new String(chars);
})
.limit(streamSize);
return Try.of(() -> stream);
}
@Test
public void testJavaStream() {
Sizes.printMemoryInfo();
AtomicLong counter = new AtomicLong();
long used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
readLineOfHugeTextFile(1_000, 1_000_000)
.map(lines -> lines.map(String::toUpperCase))
.forEach(transformed -> transformed.forEach(line -> {
counter.addAndGet(line.length());
}));
Runtime.getRuntime().freeMemory();
Sizes.printMemoryInfo();
System.out.println("Total chars: " + counter);
System.out.println("Used: " + Sizes.toString(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory() - used));
}
@Test
public void testVavrStream() {
Sizes.printMemoryInfo();
AtomicLong counter = new AtomicLong();
long used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
readLineOfHugeTextFile(1_000, 1_000_000)
.map(io.vavr.collection.Stream::ofAll)
.map(lines -> lines.map(String::toUpperCase))
.forEach(transformed -> transformed.forEach(line -> {
counter.addAndGet(line.length());
}));
Runtime.getRuntime().freeMemory();
Sizes.printMemoryInfo();
System.out.println("Total chars: " + counter);
System.out.println("Used: " + Sizes.toString(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory() - used));
}
@Test
public void testVavrIterator() {
Sizes.printMemoryInfo();
AtomicLong counter = new AtomicLong();
long used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
readLineOfHugeTextFile(1_000, 1_000_000)
.map(BaseStream::iterator)
.map(io.vavr.collection.Iterator::ofAll)
.map(lines -> lines.map(String::toUpperCase))
.forEach(transformed -> transformed.forEach(line -> {
counter.addAndGet(line.length());
}));
Runtime.getRuntime().freeMemory();
Sizes.printMemoryInfo();
System.out.println("Total chars: " + counter);
System.out.println("Used: " + Sizes.toString(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory() - used));
}
testJavaStream
Memory[Total: 243.50 MiB; Free: 226.85 MiB; Max: 3.51 GiB; Used: 16.65 MiB]
Memory[Total: 486.50 MiB; Free: 271.04 MiB; Max: 3.51 GiB; Used: 215.46 MiB]
Total chars: 499 352 382
Used: 198.82 MiB
testVavrStream
Memory[Total: 243.50 MiB; Free: 226.86 MiB; Max: 3.51 GiB; Used: 16.64 MiB]
Memory[Total: 2.37 GiB; Free: 1.04 GiB; Max: 3.51 GiB; Used: 1.33 GiB]
Total chars: 499 218 992
Used: 1.31 GiB
testVavrIterator
Memory[Total: 243.50 MiB; Free: 226.77 MiB; Max: 3.51 GiB; Used: 16.73 MiB]
Memory[Total: 577.00 MiB; Free: 477.37 MiB; Max: 3.51 GiB; Used: 99.63 MiB]
Total chars: 499 010 003
Used: 82.90 MiB
This is not full and correct benchmark after all.
@valery1707 third one is "testVavrIterator", please fix :)
@ruslansennov fixed, thanks!
@valery1707 thanks for such a comprehensive example of use!
As mentioned in my original question - I expected my issue could be caused by the stream elements being remembered and I was looking for a way to make stream forget the elements once used (as Java streams do).
AFAIK it would be beneficial if the documentation contained such example use of the Iterator. I was not able to find this information on my own easily (though it's of course possible that I was not searching correctly ;) ).
Thanks again for support!
Thank you, we will add better documentation for Vavr 1.0
Most helpful comment
@valery1707 third one is "testVavrIterator", please fix :)