- 001
- 002
- 003
- 004
- 005
- 006
- 007
- 008
- 009
- 010
- 011
- 012
- 013
- 014
- 015
- 016
- 017
- 018
- 019
- 020
- 021
- 022
- 023
- 024
- 025
- 026
- 027
- 028
- 029
- 030
- 031
- 032
- 033
- 034
- 035
- 036
- 037
- 038
- 039
- 040
- 041
- 042
- 043
- 044
- 045
- 046
- 047
- 048
- 049
- 050
- 051
- 052
- 053
- 054
- 055
- 056
- 057
- 058
- 059
- 060
- 061
- 062
- 063
- 064
- 065
- 066
- 067
- 068
- 069
- 070
- 071
- 072
- 073
- 074
- 075
- 076
- 077
- 078
- 079
- 080
- 081
- 082
- 083
- 084
- 085
- 086
- 087
- 088
- 089
- 090
- 091
- 092
- 093
- 094
- 095
- 096
- 097
- 098
- 099
- 100
public class ConcurrentStringStatsProvider implements StringStatsProvider {
private final ExecutorService executor;
private final ExecutorCompletionService<CharCounter> service;
private final int threadNum;
public ConcurrentStringStatsProvider() {
//http://stackoverflow.com/questions/13834692/threads-configuration-based-on-no-of-cpu-cores
threadNum = Runtime.getRuntime().availableProcessors() + 1;
executor = Executors.newFixedThreadPool(threadNum);
this.service = new ExecutorCompletionService<CharCounter>(executor);
}
@Override
public synchronized CharCounter countChars(String str) {
int length = str.length();
if (length == 0)
return new CharCounter();
int chunk = length / threadNum;
if (chunk == 0)
chunk = length;
for (int i = 0; i < threadNum; i++) {
int start = i * chunk;
int end = (i + 1) * chunk - 1;
if (end > length) {
end = length - 1;
}
service.submit(new SubstringTask(start, end, str));
if (end == length - 1)
break; //break early
}
CharCounter result = null;
while (true) {
Future<CharCounter> future = service.poll();
if (future == null) {
if (result != null && result.getTotalCount() == length)
break;
else
continue;
}
CharCounter subResult = null;
try {
subResult = future.get();
} catch (InterruptedException e) {
log.error("Failed to calculate. Interrupted: ", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Calculation error", e);
throw new IllegalStateException("Calculation error: ", e);
}
if (result == null) {
result = subResult;
} else if (result.equals(subResult)) {
break; //!!
} else {
service.submit(new MergeTask(result, subResult));
result = null;
}
}
return result;
}
private class SubstringTask implements Callable<CharCounter> {
private final int start;
private final int end;
private final String str;
public SubstringTask(int start, int end, String str) {
this.start = start;
this.end = end;
this.str = Objects.requireNonNull(str);
}
@Override
public CharCounter call() throws Exception {
return doJob();
}
private CharCounter doJob() {
CharCounter charCounter = new CharCounter(end - start + 1);
for (int i = start; i <= end; i++) {
charCounter.increment(str.charAt(i));
}
return charCounter;
}
}
private class MergeTask implements Callable<CharCounter> {
private final CharCounter cc1, cc2;
public MergeTask(CharCounter cc1, CharCounter cc2) {
this.cc1 = Objects.requireNonNull(cc1);
this.cc2 = Objects.requireNonNull(cc2);
}
@Override
public CharCounter call() throws Exception {
return CharCounter.merge(cc1, cc2);
Первое знакомство с ExecutorCompletionService, решал задачку подсчета количества символов в строке в несколько потоков.
Комментарии (0) RSS
Добавить комментарий