Page tree
Skip to end of metadata
Go to start of metadata

다음 구조를 만드는게 목표입니다.


1.. 영화 데이터를 서빙하는 rating api가 5000번에 띄워져 있습니다.

2. airflow는 rating api로부터 정보를 가져오고, 가져온 데이터로 랭킹 데이터를 생성합니다.


chapter 8 경로에서 도커를 실행합니다.

docker-compose up -d --build


5000번에 rating api가 띄워져 있는지 확인해 보았습니다.

아직 5000번에 rating api가 띄워지지 않았군요.


rating api 띄우는 중 에러가 발생했습니다.

Traceback (most recent call last):
  File "/app.py", line 6, in <module>
    from flask import Flask, jsonify, request
  File "/usr/local/lib/python3.8/site-packages/flask/__init__.py", line 14, in <module>
    from jinja2 import escape
ImportError: cannot import name 'escape' from 'jinja2' (/usr/local/lib/python3.8/site-packages/jinja2/__init__.py)

https://stackoverflow.com/questions/71718167/importerror-cannot-import-name-escape-from-jinja2

Flask 버전을 올리면 된다고 하네요.

관련한 버전들을 수정했습니다.

Flask==2.1.0
pandas==0.25.2
Flask-HTTPAuth==4.6.0
click==8.0.4




서버 실행에 성공했습니다.


ratings api도 잘 호출됩니다.


날짜로 조회하는 것도 됩니다.

/ratings?start_date=2022-10-01&end_date=2022-01-02

다만 최신 날짜로 쌓인 데이터는 없는 것 같네요.

8.1 fetch +rating DAG 만들기

이제 AIRFLOW에서 주기적으로 rating api에서 fetch할 수 있도록, 코드를 만들겠습니다.

사실 이미 github에 이미 구현되어 있습니다.

https://github.com/BasPH/data-pipelines-with-apache-airflow/blob/master/chapter08/dags/01_python.py

def _get_ratings(start_date, end_date, batch_size=100):
    session, base_url = _get_session()

    yield from _get_with_pagination(
        session=session,
        url=base_url + "/ratings",
        params={"start_date": start_date, "end_date": end_date},
        batch_size=batch_size,
    )

yield from 구문을 처음 보아서 밑에 적었습니다.

https://docs.python.org/3/reference/expressions.html#yieldexpr

asynchronous하게 데이터를 가져오고 싶을 때 쓰는 것이라고 이해했습니다.



실행해 보겠습니다.


output 결과를 확인하겠습니다.

그런데, airflow scheduler나 webserver container에 들어가도 csv output 파일을 찾지 못했습니다.

(/data/python/ratings/{{ds}}.json)


/var/lib/docker/volumes 경로로 들어가서 ,output 파일을 확인했습니다.

ratings와 rankings 파일은 잘 들어갔습니다.


rankings 파일:

hook 만들기

두 번째 실습은 hook을 만드는 것입니다.

hook도 이미 구현되어 있습니다. (랭킹 로직은 구현되어 있지 않습니다.)

https://github.com/BasPH/data-pipelines-with-apache-airflow/blob/master/chapter08/dags/02_hook.py

connection을 재사용한다는 점이 지난 python DAG와 가장 큰 차이인 것 같습니다.


hook 단어가 무엇인지 궁금해 검색했습니다.

hook is a place and usually an interface provided in packaged code that allows a programmer to insert customized programming

개발자가 customizing할 수 있는 코드를 의미로 이해했습니다.


custom operator

아래 코드에 구현되어 있습니다.

https://github.com/BasPH/data-pipelines-with-apache-airflow/blob/master/chapter08/dags/03_operator.py

output path를 설정하는 곳을 두 곳에서 한 곳으로 줄였군요.

hook과 마찬가지로 랭킹 로직은 구현되어 있지 않습니다.

sensor

https://github.com/BasPH/data-pipelines-with-apache-airflow/blob/40b52ba6fcda3203b194be9e1c2850135997215a/chapter08/dags/custom/sensors.py

  • No labels

1 Comment

  1. 개인적 궁금증

    Q. hook, operator, sensor 의 수정이 필요하면 매번, 빌드하고 배포해야할까?

    • 문제가 있다면, 다른 워크플로우에 영향을 줄텐데


    Q. k8s에서 배포한다면 무거워질텐데 이미지가, 관련 패키지 설치하면서 맞는 방향일까?


    Q. Operator의 경우 어디까지 로직을 넣어야할까? 

    • 고민이됨. 


    Q. op_kwargs와 template_dict는 차이가 뭘까? 

    • context 안에 둘다 있음, 둘다 k:v 
      •  PythonOperator(
            task_id='python_task',
            python_callable= _my_func,
            op_kwargs={
                "arg1":"hello",
                "execution_date": "{{execution_date.in_timezone('Asia/Seoul').strftime('%Y-%m-%d')}}" 
            },
            depends_on_past=True 
      • 'yesterday_ds': '2022-05-02', 'yesterday_ds_nodash': '20220502', 'arg1': 'hello', 'templates_dict': None}