IT TIP

데이터 저장소에서 많은 수의 ndb 항목을 쿼리하는 모범 사례

itqueen 2020. 11. 28. 13:22
반응형

데이터 저장소에서 많은 수의 ndb 항목을 쿼리하는 모범 사례


App Engine 데이터 저장소에서 흥미로운 한계에 부딪 혔습니다. 프로덕션 서버 중 하나에서 일부 사용 데이터를 분석하는 데 도움이되는 핸들러를 만들고 있습니다. 분석을 수행하려면 데이터 저장소에서 가져온 10,000 개 이상의 항목을 쿼리하고 요약해야합니다. 계산은 어렵지 않고 사용 샘플의 특정 필터를 통과하는 항목의 히스토그램 일뿐입니다. 문제는 쿼리 기한에 도달하기 전에 처리를 수행 할 수있을만큼 데이터 저장소에서 데이터를 빠르게 가져올 수 없다는 것입니다.

성능을 향상시키기 위해 쿼리를 병렬 RPC 호출로 청크하기 위해 생각할 수있는 모든 것을 시도했지만 appstats에 따르면 쿼리를 실제로 병렬로 실행할 수없는 것 같습니다. 어떤 방법을 시도하든 (아래 참조) RPC가 순차적 인 다음 쿼리의 폭포로 돌아가는 것처럼 보입니다.

참고 : 쿼리 및 분석 코드는 작동하지만 데이터 저장소에서 데이터를 충분히 빠르게 가져올 수 없기 때문에 느리게 실행됩니다.

배경

공유 할 수있는 라이브 버전이 없지만 여기에 제가 말하는 시스템 부분에 대한 기본 모델이 있습니다.

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

샘플은 사용자가 주어진 이름의 기능을 사용할 때로 생각할 수 있습니다. (예 : 'systemA.feature_x'). 태그는 고객 세부 정보, 시스템 정보 및 기능을 기반으로합니다. 예 : [ 'winxp', '2.5.1', ​​'systemA', 'feature_x', 'premium_account']). 따라서 태그는 관심있는 샘플을 찾는 데 사용할 수있는 비정규 화 된 토큰 세트를 형성합니다.

내가하려는 분석은 날짜 범위를 취하고 고객 계정 (사용자가 아닌 회사) 당 하루 (또는 시간당) 사용 된 기능 집합 (아마 모든 기능)의 기능이 몇 번 이었는지 묻는 것으로 구성됩니다.

따라서 핸들러에 대한 입력은 다음과 같습니다.

  • 시작일
  • 종료일
  • 태그

출력은 다음과 같습니다.

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

쿼리에 대한 공통 코드

다음은 모든 쿼리에 공통적 인 코드입니다. 핸들러의 일반적인 구조는 쿼리 매개 변수를 설정하고, 쿼리를 실행하고, 결과를 처리하고, 반환 할 데이터를 생성하는 webapp2를 사용하는 간단한 get 핸들러입니다.

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

시도한 방법

데이터 스토어에서 가능한 한 빨리 병렬로 데이터를 가져 오기 위해 다양한 방법을 시도했습니다. 지금까지 시도한 방법은 다음과 같습니다.

A. 단일 반복

이것은 다른 방법과 비교할 수있는 단순한 기본 사례에 가깝습니다. 난 그냥 쿼리를 작성하고 모든 항목을 반복하여 ndb가 항목을 하나씩 끌어 오는 작업을 수행하도록합니다.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

B. 큰 가져 오기

여기서 아이디어는 내가 하나의 매우 큰 가져 오기를 수행 할 수 있는지 확인하는 것이 었습니다.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C. 시간 범위에서 비동기 가져 오기

여기서 아이디어는 전체 시간 영역을 청크로 분할하고 비동기를 사용하여 각각을 병렬로 실행하는 독립적 인 쿼리 집합을 만들 수 있도록 샘플이 시간에 따라 상당히 잘 배치되어 있음을 인식하는 것입니다.

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D. 비동기 매핑

이 방법을 시도한 이유는 문서에서 Query.map_async 메서드를 사용할 때 ndb가 자동으로 일부 병렬 처리를 이용할 수있는 것처럼 들리기 때문입니다.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

결과

전체 응답 시간과 앱 통계 추적을 수집하기 위해 하나의 예제 쿼리를 테스트했습니다. 결과는 다음과 같습니다.

A. 단일 반복

실제 : 15.645 초

이것은 차례로 일괄 가져 오기를 통해 순차적으로 진행된 다음 memcache에서 모든 세션을 검색합니다.

방법 A 앱 통계

B. 큰 가져 오기

실제 : 12.12s

옵션 A와 사실상 동일하지만 어떤 이유로 인해 조금 더 빠릅니다.

방법 B 앱 통계

C. 시간 범위에서 비동기 가져 오기

실제 : 15.251 초

처음에는 더 많은 병렬성을 제공하는 것처럼 보이지만 결과를 반복하는 동안 next에 대한 일련의 호출로 인해 속도가 느려지는 것 같습니다. 또한 세션 memcache 조회를 보류중인 쿼리와 겹칠 수없는 것 같습니다.

방법 C 앱 통계

D. 비동기 매핑

실제 : 13.752s

이것은 내가 이해하기 가장 어렵습니다. 겹치는 부분이 많은 것처럼 보이지만 모든 것이 병렬이 아닌 폭포에서 펼쳐지는 것처럼 보입니다.

방법 D 앱 통계

권장 사항

이 모든 것을 바탕으로 내가 무엇을 놓치고 있습니까? App Engine의 제한에 도달 했습니까? 아니면 많은 수의 항목을 병렬로 가져 오는 더 좋은 방법이 있습니까?

다음에 무엇을 시도해야할지 모르겠습니다. 클라이언트를 다시 작성하여 앱 엔진에 여러 요청을 병렬로 보내는 것을 생각했지만 이것은 꽤 무자비한 것 같습니다. 앱 엔진이이 사용 사례를 처리 할 수 ​​있어야하므로 내가 놓친 것이 있다고 생각합니다.

최신 정보

결국 나는 옵션 C가 내 경우에 가장 적합하다는 것을 알았습니다. 6.1 초만에 완료하도록 최적화 할 수있었습니다. 여전히 완벽하지는 않지만 훨씬 좋습니다.

여러 사람의 조언을받은 후 다음 항목이 이해하고 기억해야 할 핵심 사항이라는 것을 알게되었습니다.

  • 여러 쿼리를 병렬로 실행할 수 있습니다.
  • 한 번에 10 개의 RPC 만 비행 할 수 있습니다.
  • 보조 쿼리가없는 지점까지 비정규 화를 시도하십시오.
  • 이러한 유형의 작업은 실시간 쿼리가 아닌 축소 및 작업 대기열을 매핑하는 것이 좋습니다.

그래서 더 빠르게 만들기 위해 내가 한 일 :

  • I partitioned the query space from the beginning based upon time. (note: the more equal the partitions are in terms of entities returned, the better)
  • I denormalized the data further to remove the need for the secondary session query
  • I made use of ndb async operations and wait_any() to overlap the queries with the processing

I am still not getting the performance I would expect or like, but it is workable for now. I just wish their was a better way to pull large numbers of sequential entities into memory quickly in handlers.


Large processing like this should not be done in a user request, which has a 60s time limit. Instead, it should be done in a context that supports long-running requests. The task queue supports requests up to 10 minutes, and (I believe) normal memory restraints (F1 instances, the default, have 128MB of memory). For even higher limits (no request timeout, 1GB+ of memory), use backends.

Here's something to try: set up a URL that, when accessed, fires off a task queue task. It returns a web page that polls every ~5s to another URL that responds with true/false if the task queue task has been completed yet. The task queue processes the data, which can take some 10s of seconds, and saves the result to the datastore either as the computed data or a rendered web page. Once the initial page detects that it has completed, the user is redirected to the page, which fetches the now computed results from the datastore.


The new experimental Data Processing feature (an AppEngine API for MapReduce) looks very suitable for solving this problem. It does automatic sharding to execute multiple parallel worker processes.


I have a similar problem and after working with Google support for few weeks I can confirm there is no magic solution at least as of December 2017.

tl;dr: One can expect throughput from 220 entities/second for standard SDK running on B1 instance up to 900 entities/second for a patched SDK running on a B8 instance.

The limitation is CPU related and changing the instanced type directly impacts performance. This is confirmed by similar results obtained on B4 and B4_1G instances

The best throughput I got for an Expando entity with about 30 fields is:

Standard GAE SDK

  • B1 instance: ~220 entities/second
  • B2 instance: ~250 entities/second
  • B4 instance: ~560 entities/second
  • B4_1G instance: ~560 entities/second
  • B8 instance: ~650 entities/second

Patched GAE SDK

  • B1 instance: ~420 entities/second
  • B8 instance: ~900 entities/second

For standard GAE SDK I tried various approaches including multi-threading but the best proved to be fetch_async with wait_any. Current NDB library already does a great job of using async and futures under the hood so any attempt to push that using threads only make it worse.

I found two interesting approaches to optimize this:

Matt Faus explains the problem very well:

GAE SDK provides an API for reading and writing objects derived from your classes to the datastore. This saves you the boring work of validating raw data returned from the datastore and repackaging it into an easy-to-use object. In particular, GAE uses protocol buffers to transmit raw data from the store to the frontend machine that needs it. The SDK is then responsible for decoding this format and returning a clean object to your code. This utility is great, but sometimes it does a bit more work than you would like. [...] Using our profiling tool, I discovered that fully 50% of the time spent fetching these entities was during the protobuf-to-python-object decoding phase. This means that the CPU on the frontend server was a bottleneck in these datastore reads!

GAE 데이터 액세스 웹 요청

Both approaches try to reduce the time spent doing protobuf to Python decoding by reducing the number of fields decoded.

I tried both approaches but I only succeed with Matt's. SDK internals changed since Evan published his solution. I had to change a bit the code published by Matt here, but is was pretty easy - if there is interest I can publish the final code.

For a regular Expando entity with about 30 fields I used Matt's solution to decode only couple fields and obtained a significant improvement.

In conclusion one need to plan accordingly and don't expect to be able to process much more than few hundreds entities in a "real-time" GAE request.


Large data operations on App Engine best implemented using some sort of mapreduce operation.

Here's a video describing the process, but including BigQuery https://developers.google.com/events/io/sessions/gooio2012/307/

It doesn't sound like you need BigQuery, but you probably want to use both the Map and Reduce portions of the pipeline.

The main difference between what you're doing and the mapreduce situation is that you're launching one instance and iterating through the queries, where on mapreduce, you would have a separate instance running in parallel for each query. You will need a reduce operation to "sum up" all the data, and write the result somewhere though.

The other problem you have is that you should use cursors to iterate. https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors

반복기가 쿼리 오프셋을 사용하는 경우 오프셋이 동일한 쿼리를 발행하고 여러 결과를 건너 뛰고 다음 세트를 제공하는 반면 커서는 다음 세트로 바로 이동하므로 비효율적입니다.

참고 URL : https://stackoverflow.com/questions/11509368/best-practice-to-query-large-number-of-ndb-entities-from-datastore

반응형