RxJava가 Observable을 병렬로 가져 오기
RxJava에서 병렬 비동기 호출을 구현하는 데 도움이 필요합니다. FIRST 호출이 표시 할 제품 목록 (타일)을 가져 오는 (대신 검색) 간단한 사용 사례를 선택했습니다. 후속 호출이 나가서 (A) 리뷰 및 (B) 제품 이미지를 가져옵니다.
몇 번의 시도 끝에 나는 이곳에 도착했습니다.
1 Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm);
2 List<Tile> allTiles = new ArrayList<Tile>();
3 ClientResponse response = new ClientResponse();
4 searchTile.parallel(oTile -> {
5 return oTile.flatMap(t -> {
6 Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId());
7 Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId());
8 return Observable.zip(reviews, imageUrl, (r, u) -> {
9 t.setReviews(r);
10 t.setImageUrl(u);
11 return t;
12 });
13 });
14 }).subscribe(e -> {
15 allTiles.add((Tile) e);
16 });
1 행 : 나가서 표시 할 제품 (타일)을 가져옵니다.
4 행 : Observable 목록을 가져 와서 리뷰와 imageUrl을 가져 오기 위해 SHARD합니다.
거짓말 6,7 : Observable 리뷰와 Observable URL 가져 오기
8 행 : 마지막으로 업데이트 된 Observable을 반환하기 위해 2 개의 Observable이 압축됩니다.
15 행 : 마지막으로 15 행은 호출 계층으로 다시 반환 할 수있는 컬렉션에 표시 할 모든 개별 제품을 수집합니다.
Observable이 샤딩되고 테스트에서 4 개의 다른 스레드를 통해 실행되는 동안; 리뷰와 이미지를 가져 오는 것은 속속 인 것 같습니다. 8 행의 zip 단계가 기본적으로 2 개의 관찰 가능 항목 (리뷰 및 URL)을 순차적으로 호출하는 것으로 의심됩니다.
이 그룹은 병렬 가져 오기 reiews 및 이미지 URL에 대한 제안이 있습니까? 본질적으로 위에 첨부 된 폭포 차트는 더 수직으로 쌓여 있어야합니다. 리뷰와 이미지에 대한 호출은 병행해야합니다.
감사합니다 anand raman
병렬 연산자는 거의 모든 사용 사례에서 문제가되는 것으로 판명되었으며 대부분의 예상을 수행하지 않으므로 1.0.0.rc.4 릴리스에서 제거되었습니다 : https://github.com/ReactiveX/RxJava/ 당기기 / 1716
이 유형의 동작을 수행하고 병렬 실행을 얻는 방법에 대한 좋은 예는 여기에서 볼 수 있습니다 .
예제 코드에서 searchServiceClient
동기 또는 비동기 인지 명확하지 않습니다 . 이미 비동기식 인 것처럼 추가 스케줄링이 필요하지 않은 것처럼 문제를 해결하는 방법에 약간 영향을줍니다. 동기식 추가 스케줄링이 필요한 경우.
먼저 다음은 동기 및 비동기 동작을 보여주는 몇 가지 간단한 예입니다.
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ParallelExecution {
public static void main(String[] args) {
System.out.println("------------ mergingAsync");
mergingAsync();
System.out.println("------------ mergingSync");
mergingSync();
System.out.println("------------ mergingSyncMadeAsync");
mergingSyncMadeAsync();
System.out.println("------------ flatMapExampleSync");
flatMapExampleSync();
System.out.println("------------ flatMapExampleAsync");
flatMapExampleAsync();
System.out.println("------------");
}
private static void mergingAsync() {
Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println);
}
private static void mergingSync() {
// here you'll see the delay as each is executed synchronously
Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println);
}
private static void mergingSyncMadeAsync() {
// if you have something synchronous and want to make it async, you can schedule it like this
// so here we see both executed concurrently
Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println);
}
private static void flatMapExampleAsync() {
Observable.range(0, 5).flatMap(i -> {
return getDataAsync(i);
}).toBlocking().forEach(System.out::println);
}
private static void flatMapExampleSync() {
Observable.range(0, 5).flatMap(i -> {
return getDataSync(i);
}).toBlocking().forEach(System.out::println);
}
// artificial representations of IO work
static Observable<Integer> getDataAsync(int i) {
return getDataSync(i).subscribeOn(Schedulers.io());
}
static Observable<Integer> getDataSync(int i) {
return Observable.create((Subscriber<? super Integer> s) -> {
// simulate latency
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext(i);
s.onCompleted();
});
}
}
다음은 코드와 더 일치하는 예제를 제공하려는 시도입니다.
import java.util.List;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ParallelExecutionExample {
public static void main(String[] args) {
final long startTime = System.currentTimeMillis();
Observable<Tile> searchTile = getSearchResults("search term")
.doOnSubscribe(() -> logTime("Search started ", startTime))
.doOnCompleted(() -> logTime("Search completed ", startTime));
Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
.doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime));
Observable<String> imageUrl = getProductImage(t.getProductId())
.doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime));
return Observable.zip(reviews, imageUrl, (r, u) -> {
return new TileResponse(t, r, u);
}).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime));
});
List<TileResponse> allTiles = populatedTiles.toList()
.doOnCompleted(() -> logTime("All Tiles Completed ", startTime))
.toBlocking().single();
}
private static Observable<Tile> getSearchResults(String string) {
return mockClient(new Tile(1), new Tile(2), new Tile(3));
}
private static Observable<Reviews> getSellerReviews(int id) {
return mockClient(new Reviews());
}
private static Observable<String> getProductImage(int id) {
return mockClient("image_" + id);
}
private static void logTime(String message, long startTime) {
System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");
}
private static <T> Observable<T> mockClient(T... ts) {
return Observable.create((Subscriber<? super T> s) -> {
// simulate latency
try {
Thread.sleep(1000);
} catch (Exception e) {
}
for (T t : ts) {
s.onNext(t);
}
s.onCompleted();
}).subscribeOn(Schedulers.io());
// note the use of subscribeOn to make an otherwise synchronous Observable async
}
public static class TileResponse {
public TileResponse(Tile t, Reviews r, String u) {
// store the values
}
}
public static class Tile {
private final int id;
public Tile(int i) {
this.id = i;
}
public int getSellerId() {
return id;
}
public int getProductId() {
return id;
}
}
public static class Reviews {
}
}
결과는 다음과 같습니다.
Search started => 65ms
Search completed => 1094ms
getProductImage[1] completed => 2095ms
getSellerReviews[2] completed => 2095ms
getProductImage[3] completed => 2095ms
zip[1] completed => 2096ms
zip[2] completed => 2096ms
getProductImage[2] completed => 2096ms
getSellerReviews[1] completed => 2096ms
zip[3] completed => 2096ms
All Tiles Completed => 2097ms
getSellerReviews[3] completed => 2097ms
각 IO 호출을 1000ms가 걸리도록 시뮬레이션하여 대기 시간이 어디에 있는지 그리고 병렬로 발생하고 있음을 분명히했습니다. 진행 상황을 밀리 초 단위로 인쇄합니다.
여기서 트릭은 flatMap이 비동기 호출을 병합하므로 병합되는 Observable이 비동기 인 한 모두 동시에 실행된다는 것입니다.
같은 호출 getProductImage(t.getProductId())
이 동기식이면 다음과 같이 비동기식으로 만들 수 있습니다 : getProductImage (t.getProductId ()). subscribeOn (Schedulers.io).
다음은 모든 로깅 및 상용구 유형이없는 위 예제의 중요한 부분입니다.
Observable<Tile> searchTile = getSearchResults("search term");;
Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
Observable<Reviews> reviews = getSellerReviews(t.getSellerId());
Observable<String> imageUrl = getProductImage(t.getProductId());
return Observable.zip(reviews, imageUrl, (r, u) -> {
return new TileResponse(t, r, u);
});
});
List<TileResponse> allTiles = populatedTiles.toList()
.toBlocking().single();
이게 도움이 되길 바란다.
IDE가 아직 JDK 8 소스를 자동으로 감지하지 못하는 JDK 7 인 사람들과 @benjchristensen의 위의 훌륭한 응답 (및 설명)을 시도해 볼 수있는 사람들은이 뻔뻔한 JDK 7 코드를 사용할 수 있습니다. 놀라운 설명과 예를 위해 @benjchristensen에게 찬사를 보냅니다!
import java.util.List;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
public class ParallelExecutionExample
{
public static void main(String[] args)
{
final long startTime = System.currentTimeMillis();
Observable<Tile> searchTile = getSearchResults("search term")
.doOnSubscribe(new Action0()
{
@Override
public void call()
{
logTime("Search started ", startTime);
}
})
.doOnCompleted(new Action0()
{
@Override
public void call()
{
logTime("Search completed ", startTime);
}
});
Observable<TileResponse> populatedTiles = searchTile.flatMap(new Func1<Tile, Observable<TileResponse>>()
{
@Override
public Observable<TileResponse> call(final Tile t)
{
Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
.doOnCompleted(new Action0()
{
@Override
public void call()
{
logTime("getSellerReviews[" + t.id + "] completed ", startTime);
}
});
Observable<String> imageUrl = getProductImage(t.getProductId())
.doOnCompleted(new Action0()
{
@Override
public void call()
{
logTime("getProductImage[" + t.id + "] completed ", startTime);
}
});
return Observable.zip(reviews, imageUrl, new Func2<Reviews, String, TileResponse>()
{
@Override
public TileResponse call(Reviews r, String u)
{
return new TileResponse(t, r, u);
}
})
.doOnCompleted(new Action0()
{
@Override
public void call()
{
logTime("zip[" + t.id + "] completed ", startTime);
}
});
}
});
List<TileResponse> allTiles = populatedTiles
.toList()
.doOnCompleted(new Action0()
{
@Override
public void call()
{
logTime("All Tiles Completed ", startTime);
}
})
.toBlocking()
.single();
}
private static Observable<Tile> getSearchResults(String string)
{
return mockClient(new Tile(1), new Tile(2), new Tile(3));
}
private static Observable<Reviews> getSellerReviews(int id)
{
return mockClient(new Reviews());
}
private static Observable<String> getProductImage(int id)
{
return mockClient("image_" + id);
}
private static void logTime(String message, long startTime)
{
System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");
}
private static <T> Observable<T> mockClient(final T... ts)
{
return Observable.create(new Observable.OnSubscribe<T>()
{
@Override
public void call(Subscriber<? super T> s)
{
try
{
Thread.sleep(1000);
}
catch (Exception e)
{
}
for (T t : ts)
{
s.onNext(t);
}
s.onCompleted();
}
})
.subscribeOn(Schedulers.io());
// note the use of subscribeOn to make an otherwise synchronous Observable async
}
public static class TileResponse
{
public TileResponse(Tile t, Reviews r, String u)
{
// store the values
}
}
public static class Tile
{
private final int id;
public Tile(int i)
{
this.id = i;
}
public int getSellerId()
{
return id;
}
public int getProductId()
{
return id;
}
}
public static class Reviews
{
}
}
참조 URL : https://stackoverflow.com/questions/26249030/rxjava-fetching-observables-in-parallel
'IT TIP' 카테고리의 다른 글
Elasticsearch의 모든 문서 덤프 (0) | 2021.01.07 |
---|---|
파이썬에서 중괄호와 대괄호의 차이점은 무엇입니까? (0) | 2021.01.07 |
Github 사용자의 모든 공개 댓글을 어떻게 찾을 수 있나요? (0) | 2021.01.07 |
루프 내부 또는 외부에 개체를 선언 하시겠습니까? (0) | 2021.01.07 |
세션에 목록 저장 (0) | 2021.01.07 |