#!/usr/bin/env python # -*- coding: utf-8 -*- import os, sys, base64, json, time, http.client es_base_url = os.environ.get('ES_BASE_URL','http://localhost:9200') if es_base_url.startswith('https://'): conn = http.client.HTTPSConnection(es_base_url.replace('https://','')) elif es_base_url.startswith('http://'): conn = http.client.HTTPConnection(es_base_url.replace('https://','')) else: # schema is not spcified, expecting hostname with/without port conn = http.client.HTTPConnection(es_base_url) es_username = os.environ.get('ES_USERNAME') es_password = os.environ.get('ES_PASSWORD') if es_username and es_password: basic_auth = 'Basic %s' % base64.b64encode(str.encode(es_username+":"+es_password)).decode('ascii') es_index = os.environ.get('ES_INDEX', 'logstash') es_type = os.environ.get('ES_TYPE', 'default') def lambda_handler(event, context): # print("Received event: "+json.dumps(event)) for record in event.get('Records',[]): if record.get('kinesis'): try: data = json.loads(base64.b64decode(record.get('kinesis').get('data'))).get('payloads') # print(data) headers = {'content-type':'application/json'} if basic_auth: headers['Authorization']=basic_auth if conn: try: conn.request('POST','/'+es_index+'-'+time.strftime('%Y.%m.%d')+'/'+es_type, body = json.dumps(data), headers = headers) res = conn.getresponse() print(res.status, res.reason) print(res.read()) conn.close() except: print("Unexpected error:", sys.exc_info()[0]) except: print("Unexpected error:", sys.exc_info()[0]) return True # to test locally, specify JSON files. if __name__ == '__main__': if len(sys.argv)>0: records=[] for f in sys.argv[1:]: data=open(f).read() records.append( {"kinesis": { "data":base64.b64encode(str.encode(data)).decode('ascii') } } ) print(json.dumps(lambda_handler({"Records":records}, {}))) else: print("usage: "+sys.argv[0]+" /path/to/event.json")