cloud bee

kinesis data analyfice flink 실습 본문

AWS/data

kinesis data analyfice flink 실습

who you 2022. 10. 24. 20:19

나는 해당 깃허브와, AWS 공식문서를 통해 실습을 진행하였다.

데이터 스트림을 소스 및 싱크로 사용하여 flink용 kinesis data analytics를 생성하였다.

 

https://github.com/aws-samples/amazon-kinesis-data-analytics-flink-starter-kit 

 

GitHub - aws-samples/amazon-kinesis-data-analytics-flink-starter-kit: Amazon Kinesis Data Analytics Flink Starter Kit helps you

Amazon Kinesis Data Analytics Flink Starter Kit helps you with the development of Flink Application with Kinesis Stream as a source and Amazon S3 as a sink. This demonstrates the use of Session Win...

github.com

 

https://docs.aws.amazon.com/ko_kr/streams/latest/dev/get-started-exercise.html

 

3단계: Flink 애플리케이션을 위한 Kinesis Data Analytics 생성 및 실행 - Amazon Kinesis Data Streams

콘솔을 사용하여 Flink용 Kinesis Data Analytics Analytics을 생성할 때 애플리케이션에 대해 IAM 역할 및 정책을 생성할 수 있습니다. 애플리케이션은 이 역할 및 정책을 사용하여 종속 리소스에 액세스합

docs.aws.amazon.com

 

깃허브에서 요구하고 잇는 사전 조건은 다음과 같다.

사전 조건

  • 1. JDK 11 버전을 설치한다.
  • 2. visual studio와 같은 프로그램을 사용한다.(vim 써도 될 듯)
  • 3. Apache Maven을 설치한다.
  • 4. AWS CLI를 설치하고 사용할 수 있게끔 설정한다.
  • 5. Apache Flink를 1.11.1로 테스트하였다.

 

요구 사항

요구 사항

 

  • 1. amazon S3 Bucket가 필요하다.
  • 2. amazon kinesis data stream가 필요하다.
  • 3. amazon kinesis data analytics flink application이 필요하다.
  • 4. 4개의 정책이 존재하는 IAM 정책이 필요하다.

 

실습

일단 나는 기본 VPC를 생성하여 demo-instance를 생성하여 모든 작업을 진행할 것이다. 우선 기본 VPC부터 생성하도록 한다.

기본 VPC 생성
보안그룹 세팅

 

vpc를 생성하고 demo-instance에서 사용될 보안그룹을 생성해 준다. 이제 ec2로 가서 demo-instance를 생성해 준다.

인스턴스 생성
인스턴스 세팅
인스턴스 권한 부여

 

시간 관계상 AdministratorAccess권한을 부여하고 인스턴스를 생성해 준다. ssh 접근 방식은 kay pair로 한다.

 

생성하였으면 아래와 같이 kay pair로 접속해 준다.

인스턴스 접속

 

아래와 같은 명령어를 입력하여 java-11-amazon-corretto와 maven을 설치하여 준다.

sudo yum install java-11-amazon-corretto -y
wget https://dlcdn.apache.org/maven/maven-3/3.8.6/binaries/apache-maven-3.8.6-bin.tar.gz
tar -zxvf apache-maven-3.8.6-bin.tar.gz
sudo mv ./apache-maven-3.8.6/ /bin/
cd /bin
sudo ln -s apache-maven-3.8.6/bin/mvn mvn

 

위에 코드블록에 작성하였던 명령어를 그래도 입력하였다면 mvn이라는 커멘드를 한 번 입력하고 실행해 본다.

mvn을 입력하고 실행하였을 때 아래와 같은 문구가 뜬다면 정상적으로 설치가 제대로 된 것이다.

mvn 확인

이제 입력스트림과 출력스트림을 생성하도록 한다.

 

입력스트림 생성 명령어

aws kinesis create-stream --stream-name ExampleInputStream --shard-count 1 --region ap-northeast-2

 

출력스트림 생성 명령어

aws kinesis create-stream --stream-name ExampleOutputStream --shard-count 1 --region ap-northeast-2

 

이제 샘플 레코드를 생성하여 입력스트림에 쓰도록 한다.

stock.py를 생성하고 kinesis stream으로 쓰도록 한다.

cd ~
cat << EOF > stock.py 
import datetime
import json
import random
import boto3

STREAM_NAME = "ExampleInputStream"


def get_data():
    return {
        'EVENT_TIME': datetime.datetime.now().isoformat(),
        'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
        'PRICE': round(random.random() * 100, 2)}


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")


if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis'))
EOF
sudo mv stock.py /bin/
cd /bin
pip3 install boto3
python3 stock.py

# 사전에 미리 aws configure 명령어로 리전부분 기입하도록 함

 

flink 스트리밍

cd ~
sudo yum install git -y
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
cd amazon-kinesis-data-analytics-java-examples/GettingStarted
sed -i 's/us-west-2/ap-northeast-2/g' ./src/main/java/com/amazonaws/services/kinesisanalytics/BasicStreamingJob.java
sudo sed -i 's/${flink.version}/1.13.1/g' ./pom.xml
mvn package

위에 적었던 명령어를 복사하여 붙여 넣기를 한다.

mvn package를 진행하게 되면 다음과 같이 빌드 성공 문구가 뜬다.

빌드 성공

이제 S3에 aws-kinesis-analytics-java-apps-1.0.jar을 업로드해 주도록 한다.

aws s3 mb s3://demo-korea

aws s3 cp target/aws-kinesis-analytics-java-apps-1.0.jar s3://demo-korea

 

 

이제 kinesis analytics를 생성해 준다.

우선 console 창에 'kinesis analytics'를 검색하도록 한다.

Flink 구성

분석 애플리케이션 생성 부분에서 다음과 같이 구성한다. Apache Flink의 버전은 1.13을 사용하도록 한다.

애플리케이션의 이름은 MyApplication으로 지정한다.

 

이제 지정하고 생성하였다면 구성을 설정하도록 한다.

amazon S3 버킷은 이전에 생성해 주었던 s3://demo-korea로 지정해 준다.

객체에 대한 경로는 aws-kinesis-analytics-java-apps-1.0.jar로 지정한다.

애플리케이션 코드 위치

 

이제 런타임 속성을 다음 표와 같이 기입하고 변경사항을 저장해 준다.

런타임 속성 지정

 

ProducerConfigProperties flink.inputstream.initpos LATEST
ProducerConfigProperties aws:region ap-northeast-2
ProducerConfigProperties AggregationEnabled false
ConsumerConfigProperties aws:region ap-northeast-2
ProducerConfigProperties group.id ProducerConfigProperties

 

이제 analytics를 실행시켜 준다. 

애플리케이션 실행

 

대시보드를 열면 정상적으로 동작하는 것을 확인할 수 있다.

하지만 바이트가 수신이 되지 않는 문제점이 있다, 고칠 점이 있으면 댓글로 알려줬으면 한다.

나중에 공부할 때 참고용으로 쓴 것이라 양해 좀 해주었으면 한다.

실행 결과물

Comments