spark의 combineByKey는 여전히 어려운데, 다시 예를 들어봅시다.
(홍길동, 영통)
(홍길동, 강남)
(홍길동, 강남)
(안희정, 강남)
(안희정, 마포)
(홍길동, { 영통 : 1, 강남 2 })
(안희정, { 강남 : 1, 마포 1 })
...
# 초기화 함수에서 목표 value의 스키마를 구성한다
def d1(raw_v):
return { raw_v : 0 }
# 구성된 스키마로 구조화되지 않은 값과 구조화된 값이 만날 때
def d2(baked_v, raw_v):
baked_v[raw_v] = baked_v.get(raw_v, 0) + 1
return baked_v
# 이미 스키마로 구조화된 놈들 끼리 병합할 때
def d3(baked_v1, baked_v2):
for k, v in baked_v2.iteritems():
baked_v1[k] = baked_v1.get(k, 0) + v
return baked_v1
...
.combineByKey(d1, d2, d3)
쉽죠?