Java 8 Streams to filter by average greater than oldest element

I am trying to filter a bunch of person's in a city such that average of their ages is greater than the age of the person who has the oldest created_at timestamp in the database.

I am doing something like the below,

LinkedBlockingDeque<Person> allAges = null;
LinkedBlockingDeque<Person> filteredAges = new LinkedBlockingDeque<Person>();

allAges = ageStorage.getAllAgesByCityOrderByInsertionTime("city A");

allAges.stream()
       .filter(this.getFirstInsertedAgeGreaterThanAverage(allAges))
       .forEach(filteredAges::add);

And the getFirstInsertedAgeGreaterThanAverage is as follows,

private static Predicate<Integer> getFirstInsertedAgeGreaterThanAverage(LinkedBlockingDeque<Person> personList){
    return p -> (personList.stream().mapToInt(Person::getAge).average() >
     personList.peekFirst().getAge());
}

I guess there is something not so right here but not sure what...Is there a way this could be done without the getFirstInsertedAgeGreaterThanAverage method

Answers


It's not clear from your question which exactly subset you want. Including only one person with the maximal age (or nothing if it happens to be the first one) is the a valid answer. So I assume that you want to get the maximal possible such subset. As @tobias_k noticed, this could be solved sorting the input by age, decreasing, and select the longest prefix for which the average do not exceed the limit.

Unfortunately this cannot be solved in single Stream using standard Stream API. A possible solution might look like this:

public static List<Person> maxSubSetWithGreaterAverage(Collection<Person> persons,
        int averageLimit) {
    List<Person> list = new ArrayList<>(persons);
    // Sort people by age, decreasing
    list.sort(Comparator.comparingInt(Person::getAge).reversed());
    // get all the ages
    int[] ages = list.stream().mapToInt(Person::getAge).toArray();
    // transform them to cumulative sums
    Arrays.parallelPrefix(ages, Integer::sum);
    // Find the longest prefix for which the cumulative sum is bigger
    // than average
    int length = IntStream.range(0, ages.length)
            .filter(count -> ages[count] <= averageLimit * (count + 1)).findFirst()
            .orElse(ages.length);
    // return the corresponding subList
    return list.subList(0, length);
}

Usage:

List<Person> filtered = maxSubSetWithGreaterAverage(allAges, 
            allAges.peekFirst().getAge());

However without using Stream API and parallelPrefix the solution looks better, works faster and eats less memory:

public static List<Person> maxSubSetWithGreaterAverage(Collection<Person> persons,
        int averageLimit) {
    List<Person> list = new ArrayList<>(persons);
    list.sort(Comparator.comparingInt(Person::getAge).reversed());
    int cumulativeAge = 0;
    for(int i=0; i<list.size(); i++) {
        cumulativeAge += list.get(i).getAge();
        if(cumulativeAge <= averageLimit * (i + 1) )
            return list.subList(0, i);
    }
    return list;
}

Using my StreamEx library it's possible to define custom intermediate operation which will perform the necessary filtering in single Stream, though this requires advanced magic:

public static <T> UnaryOperator<StreamEx<T>> takeWhileAverageGreater(
        ToIntFunction<? super T> keyExtractor, int averageLimit) {
    return s -> takeWhileAverageGreater(
            s.sorted(Comparator.comparingInt(keyExtractor).reversed()),
            keyExtractor, 0L, 0L, averageLimit);
}

private static <T> StreamEx<T> takeWhileAverageGreater(StreamEx<T> input,
        ToIntFunction<? super T> keyExtractor, long count, long cumulativeSum,
        int averageLimit) {
    return input.headTail((head, tail) -> {
        // head is the first element, tail is the Stream of the rest
        // update current sum
        long newSum = cumulativeSum + keyExtractor.applyAsInt(head);
        // short-circuit via null if the limit is reached
        // otherwise call myself for the tail prepending with head
        return newSum <= averageLimit * (count + 1) ? null :
           takeWhileAverageGreater(tail, keyExtractor, count + 1, newSum, averageLimit)
               .prepend(head);
    });
}

Now new takeWhileAverageGreater operation can be used like this:

List<Person> filtered = StreamEx.of(allAges)
        .chain(takeWhileAverageGreater(Person::getAge, allAges.peekFirst().getAge()))
        .toList();

The result is the same.


Need Your Help

Connection problems with PostgreSQL XADataSource in Glassfish

java database postgresql glassfish

I'm new to GlassFish, and our company has recently migrated an old system from JBoss 2-something to GlassFish v3. Almost everything works fine, but we are having connection problems with a PostGre...

how to get a javascript code through the console

javascript jquery

Sometimes while I explore a website I came across a javascript function that can serve my future needs so I use the firebug console or google chrome and by inspecting the element on which the funct...