'Airflow/Luigi for AWS EMR automatic cluster creation and pyspark deployment

I am new to airflow automation, i dont now if it is possible to do this with apache airflow(or luigi etc) or should i just make a long bash file to do this.

I want to build dag for this

  1. Create/clone a cluster on AWS EMR
  2. Install python requirements
  3. Install pyspark related libararies
  4. Get latest code from github
  5. Submit spark job
  6. Terminate cluster on finish

for individual steps, i can make .sh files like below(not sure if it is good to do this or not) but dont know how to do it in airflow

1) creating a cluser with cluster.sh

 aws emr create-cluster \
    --name "1-node dummy cluster" \
    --instance-type m3.xlarge \
    --release-label emr-4.1.0 \
    --instance-count 1 \
    --use-default-roles \
    --applications Name=Spark \
    --auto-terminate

2 & 3 & 4) clone git and install requirements codesetup.sh

git clone some-repo.git
pip install -r requirements.txt
mv xyz.jar /usr/lib/spark/xyz.jar

5) Running spark job sparkjob.sh

aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE

6) Not sure, may be this

  terminate-clusters
--cluster-ids <value> [<value>...]

Finally this all can be executed as one .sh file. I need to know the good approach to this with airflow/luigi.

What i found:

I find this post to be close but its outdated(2016) and misses the connections and code for playbooks

https://www.agari.com/email-security-blog/automated-model-building-emr-spark-airflow/



Solution 1:[1]

I figured out that, There can be two option to do this

1) we can make a bash script with the help of emr create-cluster and addstep and then use airflow Bashoperator to schedule it

Alternatively, there is wrapper around these two, called sparksteps

An example from their documentation

sparksteps examples/episodes.py \
  --s3-bucket $AWS_S3_BUCKET \
  --aws-region us-east-1 \
  --release-label emr-4.7.0 \
  --uploads examples/lib examples/episodes.avro \
  --submit-args="--deploy-mode client --jars /home/hadoop/lib/spark-avro_2.10-2.0.2-custom.jar" \
  --app-args="--input /home/hadoop/episodes.avro" \
  --tags Application="Spark Steps" \
  --debug

you can make a .sh script with default option of your choice. After preparing this script you can call this from airflow bashoperator as below

create_command = "sparkstep_custom.sh "    

t1 = BashOperator(
        task_id= 'create_file',
        bash_command=create_command,
        dag=dag
   )

2) You can use airflow's own operators for aws to do this.

EmrCreateJobFlowOperator (for launching cluster) EmrAddStepsOperator(for submitting spark job) EmrStepSensor (to track when step finishes) EmrTerminateJobFlowOperator (to terminate clluster when step finishes)

Basic example to create cluster and submit step

my_step=[

    {
        'Name': 'setup - copy files',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['aws', 's3', 'cp', S3_URI + 'test.py', '/home/hadoop/']
        }
    },
{
        'Name': 'setup - copy files 3',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['aws', 's3', 'cp', S3_URI + 'myfiledependecy.py', '/home/hadoop/']
        }
    },
 {
        'Name': 'Run Spark',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit','--jars', "jar1.jar,jar2.jar", '--py-files','/home/hadoop/myfiledependecy.py','/home/hadoop/test.py']
        }
    }
    ]


cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow2',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    dag=dag
)

step_adder_pre_step = EmrAddStepsOperator(
    task_id='pre_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=my_steps,
    dag=dag
)
step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('pre_step', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

Also, to upload code to s3 (where i was curious to get latest code from github_ it can be done with s3, boto3 and Pythonoperator

Simple example

S3_BUCKET = 'you_bucket_name'
S3_URI = 's3://{bucket}/'.format(bucket=S3_BUCKET)
def upload_file_to_S3(filename, key, bucket_name):
    s3.Bucket(bucket_name).upload_file(filename, key)

upload_to_S3_task = PythonOperator(
    task_id='upload_to_S3',
    python_callable=upload_file_to_S3,
    op_kwargs={
        'filename': configdata['project_path']+'test.py',
        'key': 'test.py',
        'bucket_name': 'dep-buck',
    },
    dag=dag)

Solution 2:[2]

Airflow has operators for this. airflow doc

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Kamrus