java-8 – 为什么带副作用的过滤器比基于Spliterator的实现表现更好?
|
关于问题 How to skip even lines of a Stream obtained from the Files.lines,我遵循接受的答案方法,基于Spliterator< T>实现我自己的filterEven()方法.界面,例如: public static <T> Stream<T> filterEven(Stream<T> src) {
Spliterator<T> iter = src.spliterator();
AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE,Spliterator.ORDERED)
{
@Override
public boolean tryAdvance(Consumer<? super T> action) {
iter.tryAdvance(item -> {}); // discard
return iter.tryAdvance(action); // use
}
};
return StreamSupport.stream(res,false);
}
我可以通过以下方式使用它: Stream<DomainObject> res = Files.lines(src)
filterEven(res)
.map(line -> toDomainObject(line))
然而,测量这种方法对使用带有副作用的filter()的下一个方法的性能,我注意到下一个方法表现更好: final int[] counter = {0};
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
Stream<DomainObject> res = Files.lines(src)
.filter(line -> isEvenLine ())
.map(line -> toDomainObject(line))
我用JMH测试了性能,我没有在基准测试中包含文件负载.我之前将它加载到一个数组中.然后每个基准测试首先创建一个Stream< String>从上一个数组,然后过滤偶数行,然后应用mapToInt()来提取int字段的值,最后提取max()操作.这是它的基准之一(你可以检查整个程序here,这里有data file with about 186 lines): @Benchmark
public int maxTempFilterEven(DataSource src){
Stream<String> content = Arrays.stream(src.data)
.filter(s-> s.charAt(0) != '#') // Filter comments
.skip(1); // Skip line: Not available
return filterEven(content) // Filter daily info and skip hourly
.mapToInt(line -> parseInt(line.substring(14,16)))
.max()
.getAsInt();
}
我不明白为什么filter()方法比filterEven()(~50ops / ms)具有更好的性能(~80ops / ms)? 解决方法介绍我想我知道原因,但不幸的是我不知道如何提高基于Spliterator的解决方案的性能(至少不重写整个Streams API功能). 旁注1:设计Stream API时,性能不是最重要的设计目标.如果性能至关重要,那么很可能在没有Stream API的情况下重写代码会使代码更快. (例如,Stream API不可避免地增加了内存分配,从而增加了GC压力).另一方面,在大多数情况下,Stream API以相对较小的性能降级为代价提供更好的更高级API. 第1部分或简短的理论答案 流被设计为实现一种内部迭代,因为消费和外部迭代(即基于Spliterator)的主要手段是一种“模拟”的附加手段.因此,外部迭代涉及一些开销.懒惰增加了外部迭代效率的一些限制,并且需要支持flatMap,因此在此过程中需要使用某种动态缓冲区. 旁注2在某些情况下,基于Spliterator的迭代可能与内部迭代一样快(即在这种情况下为过滤器).特别是在直接从包含数据的Stream创建Spliterator的情况下.要查看它,您可以修改测试以将第一个过滤器具体化为Strings数组: String[] filteredData = Arrays.stream(src.data)
.filter(s-> s.charAt(0) != '#') // Filter comments
.skip(1)
.toArray(String[]::new);
然后将maxTempFilter和maxTempFilterEven的性能进行比较,以接受预过滤的String [] filteredData.如果你想知道为什么会这样,你可能应该阅读这个长答案的其余部分或至少第2部分. 第2部分或更长的理论答案: 流被设计为主要通过一些终端操作来消费.虽然支持逐个迭代元素但不是设计为消耗流的主要方式. 请注意,使用“功能”流API,例如map,flatMap,filter,reduce和collect,你不能说在某个步骤“我已经有足够的数据,停止迭代源并推送值”.您可以丢弃一些传入的数据(如过滤器那样)但不能停止迭代. (获取和跳过转换实际上是使用Spliterator实现的;而anyMatch,allMatch,noneMatch,findFirst,findAny等使用非公共API j.u.s.Sink.cancellationRequested,它们也更容易,因为不能有多个终端操作).如果管道中的所有转换都是同步的,您可以将它们组合成单个聚合函数(Consumer)并在一个简单的循环中调用它(可选地将循环执行分成几个线程).这就是我基于状态的过滤器的简化版本所代表的内容(请参阅“显示一些代码”部分中的代码).如果管道中有flatMap但想法仍然相同,则会变得有点复杂. 基于Spliterator的转换从根本上是不同的,因为它为管道添加了异步的消费者驱动步骤.现在Spliterator而不是源Stream驱动迭代过程.如果您直接在源Stream上请求Spliterator,它可能会返回一些只是迭代其内部数据结构的实现,这就是为什么实现预过滤数据应该消除性能差异.但是,如果为某些非空管道创建Spliterator,除了要求源通过管道逐个推送元素直到某个元素通过所有过滤器之外,没有其他(简单)选择(另请参见第二个示例)给我看一些代码部分).源元素被逐个推送而不是在某些批次中被推动的事实是使Streams变得懒惰的基本决定的结果.需要一个缓冲区而不是一个元素是支持flatMap的结果:从源中推送一个元素可以为Spliterator生成许多元素. 第3部分或给我看一些代码 这部分试图为“理论”部分中描述的代码(实际代码和模拟代码的链接)提供一些支持. 首先,你应该知道当前的Streams API实现将非终端(中间)操作累积到一个惰性管道中(参见j.u.s.AbstractPipeline及其子节点,如j.u.s.ReferencePipeline.然后,当应用终端操作时,原始元素中的所有元素流被“推”通过管道. 你看到的是两件事的结果: >流量管道在您遇到的情况下不同的事实 具有状态过滤器的代码或多或少类似于以下简单代码: static int similarToFilter(String[] data)
{
final int[] counter = {0};
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
int skip = 1;
boolean reduceEmpty = true;
int reduceState = 0;
for (String outerEl : data)
{
if (outerEl.charAt(0) != '#')
{
if (skip > 0)
skip--;
else
{
if (isEvenLine.test(outerEl))
{
int intEl = parseInt(outerEl.substring(14,16));
if (reduceEmpty)
{
reduceState = intEl;
reduceEmpty = false;
}
else
{
reduceState = Math.max(reduceState,intEl);
}
}
}
}
}
return reduceState;
}
请注意,这实际上是一个单循环,内部有一些计算(过滤/转换). 另一方面,当您将Spliterator添加到管道中时,事情会发生显着变化,即使简化代码与实际发生的代码相似也会变得更大,例如: interface Sp<T>
{
public boolean tryAdvance(Consumer<? super T> action);
}
static class ArraySp<T> implements Sp<T>
{
private final T[] array;
private int pos;
public ArraySp(T[] array)
{
this.array = array;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (pos < array.length)
{
action.accept(array[pos]);
pos++;
return true;
}
else
{
return false;
}
}
}
static class WrappingSp<T> implements Sp<T>,Consumer<T>
{
private final Sp<T> sourceSp;
private final Predicate<T> filter;
private final ArrayList<T> buffer = new ArrayList<T>();
private int pos;
public WrappingSp(Sp<T> sourceSp,Predicate<T> filter)
{
this.sourceSp = sourceSp;
this.filter = filter;
}
@Override
public void accept(T t)
{
buffer.add(t);
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
while (true)
{
if (pos >= buffer.size())
{
pos = 0;
buffer.clear();
sourceSp.tryAdvance(this);
}
// failed to fill buffer
if (buffer.size() == 0)
return false;
T nextElem = buffer.get(pos);
pos++;
if (filter.test(nextElem))
{
action.accept(nextElem);
return true;
}
}
}
}
static class OddLineSp<T> implements Sp<T>,Consumer<T>
{
private Sp<T> sourceSp;
public OddLineSp(Sp<T> sourceSp)
{
this.sourceSp = sourceSp;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (sourceSp == null)
return false;
sourceSp.tryAdvance(this);
if (!sourceSp.tryAdvance(action))
{
sourceSp = null;
}
return true;
}
@Override
public void accept(T t)
{
}
}
static class ReduceIntMax
{
boolean reduceEmpty = true;
int reduceState = 0;
public int getReduceState()
{
return reduceState;
}
public void accept(int t)
{
if (reduceEmpty)
{
reduceEmpty = false;
reduceState = t;
}
else
{
reduceState = Math.max(reduceState,t);
}
}
}
static int similarToSpliterator(String[] data)
{
ArraySp<String> src = new ArraySp<>(data);
int[] skip = new int[1];
skip[0] = 1;
WrappingSp<String> firstFilter = new WrappingSp<String>(src,(s) ->
{
if (s.charAt(0) == '#')
return false;
if (skip[0] != 0)
{
skip[0]--;
return false;
}
return true;
});
OddLineSp<String> oddLines = new OddLineSp<>(firstFilter);
final ReduceIntMax reduceIntMax = new ReduceIntMax();
while (oddLines.tryAdvance(s ->
{
int intValue = parseInt(s.substring(14,16));
reduceIntMax.accept(intValue);
})) ; // do nothing in the loop body
return reduceIntMax.getReduceState();
}
这段代码更大,因为在循环内部没有一些非平凡的有状态回调的情况下,逻辑是不可能的(或者至少非常难).这里接口Sp是j.u.s.Stream和j.u.Spliterator接口的混合. (编辑:安卓应用网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
