본문 바로가기

공부/Airflow

[Airflow] DAG Log 관리 (+ DAG run log)

dag 로그를 지우기 위해 생긴 dag 로그를 지워보자!

 

Airflow에서는 DAG 로그를 자동으로 정리할 수 있는 기능을 제공하지 않는다...?!

DAG 로그는 $AIRFLOW_HOME/logs 에 저장되게 되는데 보통 주기적으로 로컬 파일을 삭제하는 DAG를 만들어 관리하게 된다. (DAG 로그를 지우기 위해 DAG 로그를 쌓고,,,무한반복)

 

DAG를 자주 실행하는 경우 필수적임.

 

로컬 파일 로그 정리 DAG

import os
import shutil
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta, timezone
from airflow import DAG
from airflow.operators.python import PythonOperator

def cleanup_old_logs():
    log_dir = '/opt/airflow/logs'
    cutoff_date = (datetime.now(timezone.utc) - timedelta(days=30)).date()
    delete_count = 0

    for dag_id in os.listdir(log_dir):
        dag_path = os.path.join(log_dir, dag_id)
        if os.path.isdir(dag_path):
            for run_id in os.listdir(dag_path):
                run_path = os.path.join(dag_path, run_id)
                if os.path.isdir(run_path):
                    run_time_str = run_id.split('__')[-1]
                    try:
                        run_time = datetime.fromisoformat(run_time_str).date()
                        if run_time < cutoff_date:
                            shutil.rmtree(run_path)
                            delete_count += 1
                            print(f"Deleted logs: {run_path}")
                    except ValueError:
                        print(f"Skipping: {run_path}, unable to parse date from {run_time_str}")
    
    print(f"Total logs deleted: {delete_count}")

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'cleanup_DAG_logs_30days',
    default_args=default_args,
    description='A DAG to cleanup old DAG logs older than 30 days',
    schedule_interval='0 2 * * *',  # 매일 새벽 2시에 실행
    start_date=days_ago(1),
    catchup=False,
)

cleanup_task = PythonOperator(
    task_id='cleanup_old_logs',
    python_callable=cleanup_old_logs,
    dag=dag,
)

cleanup_task

 

 

+ 추가로 postgresql을 사용하는 경우 dag_run 정보, task_instance 정보가 각 테스크마다 쌓이게 됨.. 


PostgreSQL 로그 정리 DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'cleanup_DAG_RUN_and_task_instance_logs_30days',
    default_args=default_args,
    description='Delete old DAG_RUN and task_instance records from PostgreSQL after 30 days',
    schedule_interval='0 2 * * *',  # 매일 새벽 2시에 실행
    start_date=days_ago(1),
    catchup=False,
)

def delete_old_records():
    pg_hook = PostgresHook(postgres_conn_id='postgres_default')
    conn = pg_hook.get_conn()
    cursor = conn.cursor()
    
    cutoff_date = datetime.now() - timedelta(days=30)
    
    delete_dag_run_query = """
    DELETE FROM dag_run
    WHERE execution_date < %s;
    """
    
    delete_task_instance_query = """
    DELETE FROM task_instance
    WHERE execution_date < %s;
    """
    
    cursor.execute(delete_dag_run_query, (cutoff_date,))
    cursor.execute(delete_task_instance_query, (cutoff_date,))
    
    conn.commit()
    cursor.close()
    conn.close()

delete_old_records_task = PythonOperator(
    task_id='delete_old_records',
    python_callable=delete_old_records,
    dag=dag,
)

delete_old_records_task