6

AWS Serverless Services: Lambda - Knoldus Blogs

 3 years ago
source link: https://blog.knoldus.com/aws-lambda-query-athena-using-lambda/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Reading Time: 3 minutes

Hi folks, next in the series of our Serverless Service is AWS Lambda. But before that if you haven’t checked my previous blog on the serverless series, take a look at it here https://blog.knoldus.com/aws-serverless-services-athena/

What is AWS Lambda

AWS Lambda is a compute service that lets you run code without provisioning or managing servers. What this basically means is that you can write the code in Lambda without worrying much about setting or provisioning servers or Infrastructure. This service will automatically handle the Infrastructure perform all the system related things like upgrading, downgrading, maintenance, security and logging for you. You just have to provide the code in language supported by AWS Lambda which at present are Python, NodeJS, Java, Go and Ruby.

We can use AWS Lambda to run your code in response to events, such as changes to data in an Amazon S3 bucket or an Amazon DynamoDB table; to run your code in response to HTTP requests using Amazon API Gateway; or invoke our code using API calls made using AWS SDKs. With these capabilities, we can use Lambda to easily build data processing triggers for AWS services like Amazon S3 and Amazon DynamoDB, process streaming data stored in Kinesis, or create our own back end that operates at AWS scale, performance, and security.

We can build complete Serverless architecture where the backend calls can be easily supported by the Lambda which can be triggered by various other events of AWS or our code.

Creating a Lambda

Though you can write lambda functions in any of the supported language of Lambda, we will use Python for our use case.

Lambda provides runtime for Python that execute your code to process events. Your code runs in an environment that includes the SDK for Python (Boto3), with credentials from an AWS Identity and Access Management (IAM) role that you manage.

  1. Open the Lambda Console
  2. Choose Create Function
  3. Click Create from scratch
  4. Give your Lambda a name
  5. Select Python 3.8
  6. Click Create Function

On Creating the function we will have the a default function
def lambda_handler(event, context):

The lambda_function file exports a function named lambda_handler that takes an event object and a context object. This is the handler function that Lambda calls when the function is invoked. The Python function runtime gets invocation events from Lambda and passes them to the handler. In the function configuration, the handler value is lambda_function.lambda_handler.

Example

Since we already know about AWS Athena lets try to integrate that code with Lambda so as we can query Athena using a Lambda and can get the results.

We will first write code to connect to athena, which is

    client = boto3.client('athena')
    query_start = client.start_query_execution(
        QueryString= query.format(table_name),
        QueryExecutionContext={
            'Database': db_name
        },
        ResultConfiguration={
            'OutputLocation': output_location
        }
    )

    query_id = query_start['QueryExecutionId']

When you query Athena, it tries to return the result in best possible time. As there is no defined SLA, so sometimes a query takes just a second to respond whereas sometimes it can take more than few seconds to respond. So keeping that in mind we need our code to be tolerant to that delays as well. So while querying, we will define for number of iterations the query should run and with a wait time of 2 sec after each iteration if the result is not returned.

while (total_iterations > 0):
    total_iterations -=1

    queryExecutionObject = client.get_query_execution(QueryExecutionId = query_id)

    request_id = queryExecutionObject['ResponseMetadata']['RequestId']
    status = queryExecutionObject['QueryExecution']['Status']['State']

    if status == "FAILED" or status == "CANCELLED":
        raise Exception("Query unsuccessfull with status:{} for requestId: {}")
    elif status == "SUCCEEDED":
        results = client.get_query_results(QueryExecutionId=query_id)
        rows = results['ResultSet']['Rows']
    else:
        time.sleep(2)

Now the first element is the header row containing the column names, we will need that to write our json object. But we need the data, so we will iterate from our second json element.

data = [ data_row["Data"][0]["VarCharValue"] for data_row in rows[1:] ]

Therefore finally out code comes out to be

import json
import boto3
import time

query = """SELECT DISTINCT(name) FROM {0};"""

db_name = 'testing_db_v1'
output_location = 's3://athena-bucket/AthenaQueryResults'
table_name = 'reports'

def lambda_handler(event, context):
    
    total_iterations = 20    
    
    client = boto3.client('athena')
    query_start = client.start_query_execution(
        QueryString= query.format(table_name),
        QueryExecutionContext={
            'Database': db_name
        },
        ResultConfiguration={
            'OutputLocation': output_location
        }
    )

    query_id = query_start['QueryExecutionId']
    
    while (total_iterations > 0):
        total_iterations -=1
        
        queryExecutionObject = client.get_query_execution(QueryExecutionId = query_id)
        
        request_id = queryExecutionObject['ResponseMetadata']['RequestId']
        status = queryExecutionObject['QueryExecution']['Status']['State']
        
        if status == "FAILED" or status == "CANCELLED":
            raise Exception("Query unsuccessfull with status:{} for requestId: {}")
        elif status == "SUCCEEDED":
            results = client.get_query_results(QueryExecutionId=query_id)
            rows = results['ResultSet']['Rows']
        else:
            time.sleep(2)
    
    data = [ data_row["Data"][0]["VarCharValue"] for data_row in rows[1:] ]
    
    responseObj = {}
    responseObj['requestId'] = queryExecutionObject['ResponseMetadata']['RequestId']
    responseObj['names'] = data
    print(responseObj)
    return responseObj

In this blog we have seen how to use the another serverless service named AWS Lambda and have also integrated our previously explained AWS Athena.

Stay tuned for Upcoming AWS Serverless Services.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK