AWS Lambda のタイムアウトをJava で作成するLambda関数に捕捉させる要件。
こういう要件、なんで Python で作らないんだという声は置いといて。。。
タイムアウトする Lambda は、ログ出力は、awslogs で CloudWatch で閲覧が可能という前提
Lambda のタイムアウト → AWSが、"Task timed out" のログを出す
⇒ Cloud Watch Log をトリガーに、フィルターで、"Task timed out" を捕捉
⇒ トリガー設定した Lambda にイベント通知で該当ログ "Task timed out" と
ロググループ、ログストリーム名が送られてくる
⇒ Cloud Watch を参照する Client を作成してタイムアウト発生したログを参照
Cloud Watch を参照する Client は、AWS SDK-Java version 2 を使うので、
Maven 依存関係
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>cloudwatchlogs</artifactId>
<version>2.17.168</version>
</dependency>
Cloud Watch Log をトリガーから、タイムアウト捕捉するLamnda に送られてくるデータ(JSON)は。
gzip 圧縮されて更に Base64 エンコードされたデータで、Lambdaリクエストハンドラに渡されるので、gzip 解凍の為に
Apache common の commons-compress を使って gzip 解凍する
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
あとついでに、以下、パターンマッチを簡単に書くために
ReturnalConsumer を使いたくて、、
<dependency>
<groupId>org.yipuran.core</groupId>
<artifactId>yipuran-core</artifactId>
<version>4.32</version>
</dependency>
必要なインポート
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsRequest;
import org.yipuran.function.ReturnalConsumer;
タイムアウト捕捉する Lambda Handler
public class TimeoutCatchHandler implements RequestHandler<InputStream, String>{
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public String handleRequest(InputStream input, Context context){
ObjectMapper mapper = new ObjectMapper();
try{
Map<String, Map<String, Object>> map = mapper.readValue(input, new TypeReference<Map<String, Map<String, Object>>>(){});
String data = map.get("awslogs").get("data").toString();
logger.info("map = " + map);
String json = readLog(Base64.getDecoder().decode(data));
logger.info("json = " + json);
AwsLogCatch logcatch = mapper.readValue(json, new TypeReference<AwsLogCatch>(){});
String message = logcatch.getLogEvents().get(0).getMessage();
logger.info("messageType = " + logcatch.getMessageType());
logger.info("logGroup = " + logcatch.getLogGroup());
logger.info("logStream = " + logcatch.getLogStream());
logger.info("getLogEvents() = " + logcatch.getLogEvents());
logger.info("id = " + logcatch.getLogEvents().get(0).getId());
logger.info("timestamp = " + logcatch.getLogEvents().get(0).getTimestamp());
logger.info("message = " + message);
int begin = ReturnalConsumer.of(Matcher.class).with(Matcher::find).apply(Pattern.compile(
"\\d{4}\\-(0[1-9]|1[012])\\-(0[1-9]|[12][0-9]|3[01])T(0[0-9]|1[0-9]|2[0-3]):(0[0-9]|[0-5][0-9]):(0[0-9]|[0-5][0-9])\\.[0-9]{3}Z"
).matcher(message)).end();
int end = ReturnalConsumer.of(Matcher.class).with(Matcher::find).apply(Pattern.compile("Task timed out").matcher(message)).start();
String requestId = message.substring(begin, end).trim();
logger.info("requestId = " + requestId);
CloudWatchLogsClient client = CloudWatchLogsClient.builder().region(Region.of”ap-northeast-1”)).build();
GetLogEventsRequest getLogEventsRequest = GetLogEventsRequest.builder()
.logGroupName(logcatch.getLogGroup())
.logStreamName(logcatch.getLogStream())
.startFromHead(true)
.build();
client.getLogEvents(getLogEventsRequest).events().stream().forEach(event->{
logger.info(event.message());
});
}catch(IOException e){
logger.error(e.getMessage(), e);
}
return null;
}
private String readLog(byte[] input) {
try(ByteArrayInputStream bin = new ByteArrayInputStream(input);GzipCompressorInputStream gin = new GzipCompressorInputStream(bin)){
ByteArrayOutputStream bo = new ByteArrayOutputStream();
int size = 0;
byte[] buf = new byte[1024];
while((size = gin.read(buf)) > 0){
bo.write(buf, 0, size);
bo.flush();
}
bo.close();
return new String(bo.toByteArray(), StandardCharsets.UTF_8);
}catch(IOException e){
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
トリガーが送ったイベントデータは、RequestHandler InputStream で受け取るデータの "data"キーで受け取った
データ JSON が、gzip を圧縮されて Base64 エンコードされているので、
private String readLog(byte[] input) のメソッドで、Base64 デコードしたデータから
gzip 解凍して抽出したJSONを、以下の 読みやすい解析のための任意のクラスに、
Jacksonで変換する。
import java.io.Serializable;
import java.util.List;
import lombok.Data;
AwsLogCatch
@Data
public class AwsLogCatch implements Serializable{
private static final long serialVersionUID = 1L;
private String messageType;
private String owner;
private String logGroup;
private String logStream;
private List<String> subscriptionFilters;
private List<AwsLogEvent> logEvents;
}
import java.io.Serializable;
import lombok.Data;
CloudWatch トリガー フィルターに引っ掛かったログ
@Data
public class AwsLogEvent implements Serializable{
private static final long serialVersionUID = 1L;
private String id;
private long timestamp;
private String message;
}
これが、トリガー フィルターに引っ掛かったログを解析するもので、
AwsLogCatch logcatch = mapper.readValue(json, new TypeReference<AwsLogCatch>(){});
Cloud Watch を参照する Client は、AWS SDK-Java version 2 は、
CloudWatchLogsClient client を生成して、logGroup , logStream を指定して
読み込むのである。
必要なロール権限ポリシーは、CloudWatchLogsReadOnlyAccess
Python の CloudWatchLogs — Boto3 Docs 1.21.42 documentation
に相当するクライアントで GetLogEventsRequest を作って
CloudWatchLogsClient の getLogEventsメソッドの evebts()でようやくタイムアウト発生した Lambda のログを
参照することができる。
上の
logger.info("timestamp = " + logcatch.getLogEvents().get(0).getTimestamp());
で取得している時刻は、epoc ミリ秒なので
long epocmili = Long.parseLong(logcatch.getLogEvents().get(0).getTimestamp()));
LocalDateTime time = Instant.ofEpochMilli(epocmili).atZone(ZoneId.of("Asia/Tokyo")).toLocalDateTime();
とすれば、LocalDateTime を見ることができるだろう。