devops

Kinesis Data Stream의 API Gateway 프록시 생성하기 본문

DevOps/AWS

Kinesis Data Stream의 API Gateway 프록시 생성하기

vataops 2022. 8. 11. 15:09
반응형

Kinesis는 데이터를 실시간으로 수집하고 ETL과 전송까지 담당하는 AWS의 서비스다. Data Stream을 실시간으로 수집하는 Kinesis Data Stream은 레코드를 직접 수집하는 것도 가능하지만, API Gateway가 프록시하는 것도 가능하다. 

단순히 PutRecord하는 것 뿐만아니라, API Gateway에서 요청에따라 Stream 생성과 삭제, ListRecord도 가능하다. 콘솔 상에서 설정하는 방법과 Terraform에서 어떻게 설정하는지도 정리해보려고 한다.


Kinesis 데이터 스트림의 API Gateway 프록시를 생성해보자

1) REST API 생성

2) streams 리소스 생성후, 하위 리소스로 다음과 같이 Stream-name 리소스 생성

리소스 경로에 괄호를 사용하여 경로 파라미터를 추가하여 리소스를 생성한다.

3) PutRecord를 위한 Record 리소스를 하위에 추가한다.  

4) Record 리소스에 PUT 메소드를 아래와 같이 생성한다.

나는 편의상 실행 역할에 ' AmazonKinesisFullAccess' 정책을 추가한 역할 ARN을 입력했다.

5) 메소드 통합요청의 HTTP 헤더에 Content-Type: 'x-amz-json-1.1' 를 추가하고, 매핑 템플릿에 아래 아래와 같은 템플릿을 추가한다.

{
    "StreamName": "$input.params('stream-name')",
    "Data": "$util.base64Encode($input.json('$.Data'))",
    "PartitionKey": "$input.path('$.PartitionKey')"
}

6) API를 배포하여 스테이지 생성한다.

이제 API Gateway 프록시는 생성되었다. 이제 생성된 API gateway Endpoint를 활용해 테스트만 해보면 된다.

TroubleShooting

aws가 제공한 공식문서대로 진행했지만, 문제가 생겼다. 아무리 PUT 요청을 해도 'Missing Authentication Token'이라는 403 오류 메시지만 나온다.

Postman으로 PUT 요청

알고보니, Request할 때의 Payload의 형식이 잘못되었다. 위에서 미리 설정한 매핑 템플릿에 맞춰 Payload를 전송해야했다. 

aws 공식 문서 내용

즉, 페이로드에는 전송할 데이터를 담을 "Data"와 샤드를 구분할 "PartitionKey", Stream 이름이 들어갈 "StreamName" 함께 넣어서 보내야한다.

페이로드 예시
올바른 응답, ShardID와 SequenceNumber

+ reference
(https://docs.aws.amazon.com/ko_kr/apigateway/latest/developerguide/integrating-api-with-aws-services-kinesis.html)

데이터를 잘 확인하자.

이번 트러블 슈팅은 내가 설정한 매핑 템플릿에도 불구하고, 템플릿에 맞게 페이로드를 구성하지 않았기 때문에 발생한 문제였다. 이전에는 전달하는 데이터 포맷이 잘못되어 많은 시간을 쏟았었다.

이번 과정은 데이터 파이프라인을 구축하는 것인 만큼, 데이터 테스트부터 ETL, 쿼리까지 데이터를 다루는 것 뿐만아니라 데이터의 포맷과 템플릿 등을 잘 확인하는 것도 중요한 시간이었다.


+ boto3를 활용해서 Data Stream으로 직접 전송하는 python 스크립트
(https://stackoverflow.com/questions/57245093/how-to-upload-the-data-from-python-sdk-to-kinesis-using-boto3)

#Generating the random number of record and sendint to Kinesis data stream

import boto3
import json
from datetime import datetime
import calendar
import random
import time

my_stream_name = 'Flight-Simulator'

kinesis_client = boto3.client('kinesis', region_name='us-east-1')

def put_to_stream(thing_id, property_value, property_timestamp):
payload = {
            'prop': str(property_value),
            'timestamp': str(property_timestamp),
            'thing_id': thing_id
          }

print(payload)

put_response = kinesis_client.put_record(
                    StreamName=my_stream_name,
                    Data=json.dumps(payload),
                    PartitionKey=thing_id)

while True:
    property_value = random.randint(40, 120)
    property_timestamp = calendar.timegm(datetime.utcnow().timetuple())
    thing_id = 'aa-bb'

    put_to_stream(thing_id, property_value, property_timestamp)

    # wait for 5 second
    time.sleep(5)
반응형
Comments