일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- CodeBuild
- istio
- CloudFormation
- S3
- 유나인버거조인트
- APIGateway
- ASG
- Round Robin
- 해킹송
- write back
- EKS
- CodeCommit
- Kubernetes
- 3AZ
- cbt
- cloudwatch-agent
- 쿠버네티스
- AWS
- write Through
- Kinesis
- CodePipeline
- SQS
- 정보처리기능사
- DNS
- server
- access_log
- Lambda
- stateful
- DaemonSet
- CloudWatch
- Today
- Total
cloud bee
kinesis data analyfice flink 실습 본문
나는 해당 깃허브와, AWS 공식문서를 통해 실습을 진행하였다.
데이터 스트림을 소스 및 싱크로 사용하여 flink용 kinesis data analytics를 생성하였다.
https://github.com/aws-samples/amazon-kinesis-data-analytics-flink-starter-kit
https://docs.aws.amazon.com/ko_kr/streams/latest/dev/get-started-exercise.html
깃허브에서 요구하고 잇는 사전 조건은 다음과 같다.
- 1. JDK 11 버전을 설치한다.
- 2. visual studio와 같은 프로그램을 사용한다.(vim 써도 될 듯)
- 3. Apache Maven을 설치한다.
- 4. AWS CLI를 설치하고 사용할 수 있게끔 설정한다.
- 5. Apache Flink를 1.11.1로 테스트하였다.
요구 사항
- 1. amazon S3 Bucket가 필요하다.
- 2. amazon kinesis data stream가 필요하다.
- 3. amazon kinesis data analytics flink application이 필요하다.
- 4. 4개의 정책이 존재하는 IAM 정책이 필요하다.
실습
일단 나는 기본 VPC를 생성하여 demo-instance를 생성하여 모든 작업을 진행할 것이다. 우선 기본 VPC부터 생성하도록 한다.
vpc를 생성하고 demo-instance에서 사용될 보안그룹을 생성해 준다. 이제 ec2로 가서 demo-instance를 생성해 준다.
시간 관계상 AdministratorAccess권한을 부여하고 인스턴스를 생성해 준다. ssh 접근 방식은 kay pair로 한다.
생성하였으면 아래와 같이 kay pair로 접속해 준다.
아래와 같은 명령어를 입력하여 java-11-amazon-corretto와 maven을 설치하여 준다.
sudo yum install java-11-amazon-corretto -y
wget https://dlcdn.apache.org/maven/maven-3/3.8.6/binaries/apache-maven-3.8.6-bin.tar.gz
tar -zxvf apache-maven-3.8.6-bin.tar.gz
sudo mv ./apache-maven-3.8.6/ /bin/
cd /bin
sudo ln -s apache-maven-3.8.6/bin/mvn mvn
위에 코드블록에 작성하였던 명령어를 그래도 입력하였다면 mvn이라는 커멘드를 한 번 입력하고 실행해 본다.
mvn을 입력하고 실행하였을 때 아래와 같은 문구가 뜬다면 정상적으로 설치가 제대로 된 것이다.
이제 입력스트림과 출력스트림을 생성하도록 한다.
입력스트림 생성 명령어
aws kinesis create-stream --stream-name ExampleInputStream --shard-count 1 --region ap-northeast-2
출력스트림 생성 명령어
aws kinesis create-stream --stream-name ExampleOutputStream --shard-count 1 --region ap-northeast-2
이제 샘플 레코드를 생성하여 입력스트림에 쓰도록 한다.
stock.py를 생성하고 kinesis stream으로 쓰도록 한다.
cd ~
cat << EOF > stock.py
import datetime
import json
import random
import boto3
STREAM_NAME = "ExampleInputStream"
def get_data():
return {
'EVENT_TIME': datetime.datetime.now().isoformat(),
'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
'PRICE': round(random.random() * 100, 2)}
def generate(stream_name, kinesis_client):
while True:
data = get_data()
print(data)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey="partitionkey")
if __name__ == '__main__':
generate(STREAM_NAME, boto3.client('kinesis'))
EOF
sudo mv stock.py /bin/
cd /bin
pip3 install boto3
python3 stock.py
# 사전에 미리 aws configure 명령어로 리전부분 기입하도록 함
flink 스트리밍
cd ~
sudo yum install git -y
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
cd amazon-kinesis-data-analytics-java-examples/GettingStarted
sed -i 's/us-west-2/ap-northeast-2/g' ./src/main/java/com/amazonaws/services/kinesisanalytics/BasicStreamingJob.java
sudo sed -i 's/${flink.version}/1.13.1/g' ./pom.xml
mvn package
위에 적었던 명령어를 복사하여 붙여 넣기를 한다.
mvn package를 진행하게 되면 다음과 같이 빌드 성공 문구가 뜬다.
이제 S3에 aws-kinesis-analytics-java-apps-1.0.jar을 업로드해 주도록 한다.
aws s3 mb s3://demo-korea
aws s3 cp target/aws-kinesis-analytics-java-apps-1.0.jar s3://demo-korea
이제 kinesis analytics를 생성해 준다.
우선 console 창에 'kinesis analytics'를 검색하도록 한다.
분석 애플리케이션 생성 부분에서 다음과 같이 구성한다. Apache Flink의 버전은 1.13을 사용하도록 한다.
애플리케이션의 이름은 MyApplication으로 지정한다.
이제 지정하고 생성하였다면 구성을 설정하도록 한다.
amazon S3 버킷은 이전에 생성해 주었던 s3://demo-korea로 지정해 준다.
객체에 대한 경로는 aws-kinesis-analytics-java-apps-1.0.jar로 지정한다.
이제 런타임 속성을 다음 표와 같이 기입하고 변경사항을 저장해 준다.
ProducerConfigProperties | flink.inputstream.initpos | LATEST |
ProducerConfigProperties | aws:region | ap-northeast-2 |
ProducerConfigProperties | AggregationEnabled | false |
ConsumerConfigProperties | aws:region | ap-northeast-2 |
ProducerConfigProperties | group.id | ProducerConfigProperties |
이제 analytics를 실행시켜 준다.
대시보드를 열면 정상적으로 동작하는 것을 확인할 수 있다.
하지만 바이트가 수신이 되지 않는 문제점이 있다, 고칠 점이 있으면 댓글로 알려줬으면 한다.
나중에 공부할 때 참고용으로 쓴 것이라 양해 좀 해주었으면 한다.
'AWS > data' 카테고리의 다른 글
S3와 EMR을 활용하여 데이터 변환작업 구성 (0) | 2023.01.10 |
---|---|
sqs + cloudwatch로 안전하게 인스턴스 관리하기 (0) | 2023.01.09 |
AWS cloudwatch-agent로 로깅하기 (0) | 2023.01.08 |
csv data athena 쿼리 (0) | 2023.01.07 |