From d3a3de32cb73a349eff59e3416c96d1228bc6734 Mon Sep 17 00:00:00 2001 From: Nhat Nam Nguyen <67188030+henrynamnguyen@users.noreply.github.com> Date: Sat, 10 Sep 2022 00:01:08 -0400 Subject: [PATCH] first commit --- .gitignore | 5 +- .idea/workspace.xml | 67 +++++- pom.xml | 96 +++++++- .../awslextest/AudioEventsSubscription.java | 224 +++++++++++++++++- .../com/example/awslextest/AudioResponse.java | 58 ++++- .../awslextest/BotResponseHandler.java | 193 ++++++++++++++- .../example/awslextest/EventsPublisher.java | 38 ++- .../LexBidirectionalStreamingExample.java | 96 +++++++- .../com/example/awslextest/SoundRecorder.java | 197 ++++++++++++++- 9 files changed, 951 insertions(+), 23 deletions(-) diff --git a/.gitignore b/.gitignore index 5ff6309..82b1c1b 100644 --- a/.gitignore +++ b/.gitignore @@ -35,4 +35,7 @@ build/ .vscode/ ### Mac OS ### -.DS_Store \ No newline at end of file +.DS_Store + +.env +/voice_sig_files/ \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml index d73829b..ae0a79b 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -4,29 +4,63 @@ + + + + + + + + file://$PROJECT_DIR$/src/main/java/com/example/awslextest/AudioEventsSubscription.java + 47 + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 18fa80a..72940c8 100644 --- a/pom.xml +++ b/pom.xml @@ -5,13 +5,95 @@ 4.0.0 org.example - TestLexStreaming1 + TestLexStreaming 1.0-SNAPSHOT - - 8 - 8 - UTF-8 - - + + + + software.amazon.awssdk + bom + 2.17.267 + pom + import + + + + + + + software.amazon.awssdk + auth + 2.17.267 + import + + + software.amazon.awssdk + regions + 2.17.267 + import + + + software.amazon.awssdk + lexruntimev2 + 2.17.267 + import + + + software.amazon.awssdk + s3 + 2.17.267 + import + + + software.amazon.awssdk + utils + 2.17.267 + import + + + software.amazon.awssdk + sdk-core + 2.17.267 + import + + + software.amazon.awssdk + aws-core + 2.17.267 + import + + + org.reactivestreams + reactive-streams + 1.0.4 + + + org.reactivestreams + reactive-streams-tck + 1.0.4 + test + + + + com.googlecode.soundlibs + jlayer + 1.0.1-1 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + + \ No newline at end of file diff --git a/src/main/java/com/example/awslextest/AudioEventsSubscription.java b/src/main/java/com/example/awslextest/AudioEventsSubscription.java index 6c85f54..cfa1335 100644 --- a/src/main/java/com/example/awslextest/AudioEventsSubscription.java +++ b/src/main/java/com/example/awslextest/AudioEventsSubscription.java @@ -1,2 +1,224 @@ -package com.example.awslextest;public class AudioEventsSubscription { +package com.example.awslextest; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.lexruntimev2.model.AudioInputEvent; +import software.amazon.awssdk.services.lexruntimev2.model.ConfigurationEvent; +import software.amazon.awssdk.services.lexruntimev2.model.DisconnectionEvent; +import software.amazon.awssdk.services.lexruntimev2.model.PlaybackCompletionEvent; +import software.amazon.awssdk.services.lexruntimev2.model.StartConversationRequestEventStream; + +import javax.sound.sampled.AudioFormat; +import javax.sound.sampled.AudioInputStream; +import javax.sound.sampled.AudioSystem; +import javax.sound.sampled.DataLine; +import javax.sound.sampled.LineUnavailableException; +import javax.sound.sampled.TargetDataLine; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +public class AudioEventsSubscription implements Subscription{ + private static final AudioFormat MIC_FORMAT = new AudioFormat(8000, 16, 1, true, false); + private static final String AUDIO_CONTENT_TYPE = "audio/lpcm; sample-rate=8000; sample-size-bits=16; channel-count=1; is-big-endian=false"; + //private static final String RESPONSE_TYPE = "audio/pcm; sample-rate=8000"; + private static final String RESPONSE_TYPE = "audio/mpeg"; + private static final int BYTES_IN_AUDIO_CHUNK = 320; + private static final AtomicLong eventIdGenerator = new AtomicLong(0); + + private final AudioInputStream audioInputStream; + private final Subscriber subscriber; + private final EventWriter eventWriter; + private CompletableFuture eventWriterFuture; + + + public AudioEventsSubscription(Subscriber subscriber) { + this.audioInputStream = getMicStream(); + this.subscriber = subscriber; + this.eventWriter = new EventWriter(subscriber, audioInputStream); + configureConversation(); + } + + private AudioInputStream getMicStream() { + try { + DataLine.Info dataLineInfo = new DataLine.Info(TargetDataLine.class, MIC_FORMAT); + TargetDataLine targetDataLine = (TargetDataLine) AudioSystem.getLine(dataLineInfo); + + targetDataLine.open(MIC_FORMAT); + targetDataLine.start(); + + return new AudioInputStream(targetDataLine); + } catch (LineUnavailableException e) { + throw new RuntimeException(e); + } + } + + @Override + public void request(long demand) { + // If a thread to write events has not been started, start it. + if (eventWriterFuture == null) { + eventWriterFuture = CompletableFuture.runAsync(eventWriter); + } + eventWriter.addDemand(demand); + } + + @Override + public void cancel() { + subscriber.onError(new RuntimeException("stream was cancelled")); + try { + audioInputStream.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public void configureConversation() { + String eventId = "ConfigurationEvent-" + String.valueOf(eventIdGenerator.incrementAndGet()); + + ConfigurationEvent configurationEvent = StartConversationRequestEventStream + .configurationEventBuilder() + .eventId(eventId) + .clientTimestampMillis(System.currentTimeMillis()) + .responseContentType(RESPONSE_TYPE) + .build(); + + System.out.println("writing config event"); + eventWriter.writeConfigurationEvent(configurationEvent); + } + + public void disconnect() { + + String eventId = "DisconnectionEvent-" + String.valueOf(eventIdGenerator.incrementAndGet()); + + DisconnectionEvent disconnectionEvent = StartConversationRequestEventStream + .disconnectionEventBuilder() + .eventId(eventId) + .clientTimestampMillis(System.currentTimeMillis()) + .build(); + + eventWriter.writeDisconnectEvent(disconnectionEvent); + + try { + audioInputStream.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + } + //Notify the subscriber that we've finished. + public void stop() { + subscriber.onComplete(); + } + + public void playbackFinished() { + String eventId = "PlaybackCompletion-" + String.valueOf(eventIdGenerator.incrementAndGet()); + + PlaybackCompletionEvent playbackCompletionEvent = StartConversationRequestEventStream + .playbackCompletionEventBuilder() + .eventId(eventId) + .clientTimestampMillis(System.currentTimeMillis()) + .build(); + + eventWriter.writePlaybackFinishedEvent(playbackCompletionEvent); + } + + private static class EventWriter implements Runnable { + private final BlockingQueue eventQueue; + private final AudioInputStream audioInputStream; + private final AtomicLong demand; + private final Subscriber subscriber; + + private boolean conversationConfigured; + + public EventWriter(Subscriber subscriber, AudioInputStream audioInputStream) { + this.eventQueue = new LinkedBlockingQueue<>(); + + this.demand = new AtomicLong(0); + this.subscriber = subscriber; + this.audioInputStream = audioInputStream; + } + + public void writeConfigurationEvent(ConfigurationEvent configurationEvent) { + eventQueue.add(configurationEvent); + } + + public void writeDisconnectEvent(DisconnectionEvent disconnectionEvent) { + eventQueue.add(disconnectionEvent); + } + + public void writePlaybackFinishedEvent(PlaybackCompletionEvent playbackCompletionEvent) { + eventQueue.add(playbackCompletionEvent); + } + + void addDemand(long l) { + this.demand.addAndGet(l); + } + + @Override + public void run() { + try { + + while (true) { + long currentDemand = demand.get(); + + if (currentDemand > 0) { + // Try to read from queue of events. + // If nothing is in queue at this point, read the audio events directly from audio stream. + for (long i = 0; i < currentDemand; i++) { + + if (eventQueue.peek() != null) { + subscriber.onNext(eventQueue.take()); + demand.decrementAndGet(); + } else { + writeAudioEvent(); + } + } + } + } + } catch (InterruptedException e) { + throw new RuntimeException("interrupted when reading data to be sent to server"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void writeAudioEvent() { + byte[] bytes = new byte[BYTES_IN_AUDIO_CHUNK]; + + int numBytesRead = 0; + try { + numBytesRead = audioInputStream.read(bytes); + if (numBytesRead != -1) { + byte[] byteArrayCopy = Arrays.copyOf(bytes, numBytesRead); + + String eventId = "AudioEvent-" + String.valueOf(eventIdGenerator.incrementAndGet()); + + AudioInputEvent audioInputEvent = StartConversationRequestEventStream + .audioInputEventBuilder() + .audioChunk(SdkBytes.fromByteBuffer(ByteBuffer.wrap(byteArrayCopy))) + .contentType(AUDIO_CONTENT_TYPE) + .clientTimestampMillis(System.currentTimeMillis()) + .eventId(eventId).build(); + + //System.out.println("sending audio event:" + audioInputEvent); + subscriber.onNext(audioInputEvent); + demand.decrementAndGet(); + //System.out.println("sent audio event:" + audioInputEvent); + } else { + subscriber.onComplete(); + System.out.println("audio stream has ended"); + } + + } catch (IOException e) { + System.out.println("got an exception when reading from audio stream"); + System.err.println(e); + subscriber.onError(e); + } + } + } } diff --git a/src/main/java/com/example/awslextest/AudioResponse.java b/src/main/java/com/example/awslextest/AudioResponse.java index 03c55c5..cf2ce29 100644 --- a/src/main/java/com/example/awslextest/AudioResponse.java +++ b/src/main/java/com/example/awslextest/AudioResponse.java @@ -1,2 +1,58 @@ -package com.example.awslextest;public class AudioResponse { +package com.example.awslextest; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Optional; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +public class AudioResponse extends InputStream{ + // Used to convert byte, which is signed in Java, to positive integer (unsigned) + private static final int UNSIGNED_BYTE_MASK = 0xFF; + private static final long POLL_INTERVAL_MS = 10; + + private final LinkedBlockingQueue byteQueue = new LinkedBlockingQueue<>(); + + private volatile boolean closed; + + @Override + public int read() throws IOException { + try { + Optional maybeInt; + while (true) { + maybeInt = Optional.ofNullable(this.byteQueue.poll(POLL_INTERVAL_MS, TimeUnit.MILLISECONDS)); + + // If we get an integer from the queue, return it. + if (maybeInt.isPresent()) { + return maybeInt.get(); + } + + // If the stream is closed and there is nothing queued up, return -1. + if (this.closed) { + return -1; + } + } + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + /** + * Writes data into the stream to be offered on future read() calls. + */ + public void write(byte[] byteArray) { + // Don't write into the stream if it is already closed. + if (this.closed) { + throw new UncheckedIOException(new IOException("Stream already closed when attempting to write into it.")); + } + + for (byte b : byteArray) { + this.byteQueue.add(b & UNSIGNED_BYTE_MASK); + } + } + + @Override + public void close() throws IOException { + this.closed = true; + super.close(); + } } diff --git a/src/main/java/com/example/awslextest/BotResponseHandler.java b/src/main/java/com/example/awslextest/BotResponseHandler.java index 54fccff..4ad6bcb 100644 --- a/src/main/java/com/example/awslextest/BotResponseHandler.java +++ b/src/main/java/com/example/awslextest/BotResponseHandler.java @@ -1,2 +1,193 @@ -package com.example.awslextest;public class BotResponseHandler { +package com.example.awslextest; + +import javazoom.jl.decoder.JavaLayerException; +import javazoom.jl.player.advanced.AdvancedPlayer; +import javazoom.jl.player.advanced.PlaybackEvent; +import javazoom.jl.player.advanced.PlaybackListener; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.lexruntimev2.model.AudioResponseEvent; +import software.amazon.awssdk.services.lexruntimev2.model.DialogActionType; +import software.amazon.awssdk.services.lexruntimev2.model.IntentResultEvent; +import software.amazon.awssdk.services.lexruntimev2.model.PlaybackInterruptionEvent; +import software.amazon.awssdk.services.lexruntimev2.model.StartConversationResponse; +import software.amazon.awssdk.services.lexruntimev2.model.StartConversationResponseEventStream; +import software.amazon.awssdk.services.lexruntimev2.model.StartConversationResponseHandler; +import software.amazon.awssdk.services.lexruntimev2.model.TextResponseEvent; +import software.amazon.awssdk.services.lexruntimev2.model.TranscriptEvent; + +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.core.sync.RequestBody; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; +public class BotResponseHandler implements StartConversationResponseHandler{ + private final EventsPublisher eventsPublisher; + + private boolean lastBotResponsePlayedBack; + private boolean isDialogStateClosed; + private AudioResponse audioResponse; + + private String sessionId; + private TextResponseEvent textResponseEvent; + private SoundRecorder recorder; + private String bucketName; + + private S3Client s3; + public BotResponseHandler(EventsPublisher eventsPublisher,String sessionId) { + this.eventsPublisher = eventsPublisher; + this.lastBotResponsePlayedBack = false;// At the start, we have not played back last response from bot. + this.isDialogStateClosed = false; // At the start, the dialog state is open. + this.sessionId = sessionId; + + this.bucketName = "demo-bucket-lex"; + String accessKey = System.getenv("AWS_ACCESS_KEY_ID"); + String secretKey = System.getenv("AWS_SECRET_KEY"); + AwsCredentialsProvider awsCredentialsProvider = StaticCredentialsProvider + .create(AwsBasicCredentials.create(accessKey, secretKey)); + this.s3 = S3Client.builder() + .region(Region.AWS_GLOBAL) + .credentialsProvider(awsCredentialsProvider) + .build(); + + this.recorder = new SoundRecorder(this.sessionId); + recorder.bootUp(); + } + + @Override + public void responseReceived(StartConversationResponse startConversationResponse) { + System.out.println("successfully established the connection with server. request id:" + startConversationResponse.responseMetadata().requestId()); // would have 2XX, request id. + } + + @Override + public void onEventStream(SdkPublisher sdkPublisher) { + + sdkPublisher.subscribe(event -> { + if (event instanceof PlaybackInterruptionEvent) { + handle((PlaybackInterruptionEvent) event); + } else if (event instanceof TranscriptEvent) { + handle((TranscriptEvent) event); + } else if (event instanceof IntentResultEvent) { + handle((IntentResultEvent) event); + } else if (event instanceof TextResponseEvent) { + handle((TextResponseEvent) event); + } else if (event instanceof AudioResponseEvent) { + handle((AudioResponseEvent) event); + } + //else if (event instanceof AudioInputEvent) { +// handle((AudioInputEvent) event); +// } + }); + } + + @Override + public void exceptionOccurred(Throwable throwable) { + System.err.println("got an exception:" + throwable); + } + + @Override + public void complete() { + System.out.println("on complete"); + } + + private void handle(PlaybackInterruptionEvent event) { + System.out.println("Got a PlaybackInterruptionEvent: " + event); + } + + private void handle(TranscriptEvent event) { + System.out.println("Got a TranscriptEvent: " + event); + } + + + private void handle(IntentResultEvent event) { + System.out.println("Got an IntentResultEvent: " + event); + isDialogStateClosed = DialogActionType.CLOSE.equals(event.sessionState().dialogAction().type()); + } + + private void handle(TextResponseEvent event) { + this.textResponseEvent = event; + System.out.println("Got an TextResponseEvent: " + event); + event.messages().forEach(message -> { + System.out.println("Message content type:" + message.contentType()); + System.out.println("Message content:" + message.content()); + }); + if (this.textResponseEvent.messages().get(0).content().equals("Please say I, henry, hereby sign this document.")){ + CompletableFuture.runAsync(() -> { + this.recorder.start(); + PutObjectRequest objectRequest = PutObjectRequest.builder() + .bucket(this.bucketName) + .key(String.format("%s.wav",this.sessionId)) + .build(); + + this.s3.putObject(objectRequest, RequestBody.fromFile(this.recorder.getWavFile())); + }); + } else { + this.recorder.finish(); + } + } + + private void handle(AudioResponseEvent event) {//Synthesize speech + // System.out.println("Got a AudioResponseEvent: " + event); + if (audioResponse == null) { + audioResponse = new AudioResponse(); + //Start an audio player in a different thread. + CompletableFuture.runAsync(() -> { + try { + AdvancedPlayer audioPlayer = new AdvancedPlayer(audioResponse); + + audioPlayer.setPlayBackListener(new PlaybackListener() { + @Override + public void playbackFinished(PlaybackEvent evt) { + super.playbackFinished(evt); + + // Inform the Amazon Lex bot that the playback has finished. + eventsPublisher.playbackFinished(); + if (isDialogStateClosed) { + lastBotResponsePlayedBack = true; + } + } + }); + audioPlayer.play(); + } catch (JavaLayerException e) { + throw new RuntimeException("got an exception when using audio player", e); + } + }); + } + + if (event.audioChunk() != null) { + audioResponse.write(event.audioChunk().asByteArray()); + } else { + // The audio prompt has ended when the audio response has no + // audio bytes. + try { + audioResponse.close(); + audioResponse = null; // Prepare for the next audio prompt. + } catch (IOException e) { + throw new UncheckedIOException("got an exception when closing the audio response", e); + } + } + } + +// private void handle(AudioInputEvent event){ +// this.audioInputEvent = event; +// System.out.println("Got a AudioInputEvent: "); +// if (this.textResponseEvent.messages().get(0).content() == "") { +// CompletableFuture.runAsync(() -> { +// event.audioChunk(); +// }); +// } +// } + // The conversation with the Amazon Lex bot is complete when the bot marks the Dialog as DialogActionType.CLOSE + // and any prompt playback is finished. For more information, see + // https://docs.aws.amazon.com/lexv2/latest/dg/API_runtime_DialogAction.html. + public boolean isConversationComplete() { + return isDialogStateClosed && lastBotResponsePlayedBack; + } } diff --git a/src/main/java/com/example/awslextest/EventsPublisher.java b/src/main/java/com/example/awslextest/EventsPublisher.java index f3c1a4d..9bc8e16 100644 --- a/src/main/java/com/example/awslextest/EventsPublisher.java +++ b/src/main/java/com/example/awslextest/EventsPublisher.java @@ -1,2 +1,38 @@ -package com.example.awslextest;public class EventsPublisher { +package com.example.awslextest; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import software.amazon.awssdk.services.lexruntimev2.model.StartConversationRequestEventStream; +public class EventsPublisher implements Publisher{ + private AudioEventsSubscription audioEventsSubscription; + + @Override + public void subscribe(Subscriber subscriber) { + if (audioEventsSubscription == null) { + + audioEventsSubscription = new AudioEventsSubscription(subscriber); + subscriber.onSubscribe(audioEventsSubscription); + + } else { + throw new IllegalStateException("received unexpected subscription request"); + } + } + + public void disconnect() { + if (audioEventsSubscription != null) { + audioEventsSubscription.disconnect(); + } + } + + public void stop() { + if (audioEventsSubscription != null) { + audioEventsSubscription.stop(); + } + } + + public void playbackFinished() { + if (audioEventsSubscription != null) { + audioEventsSubscription.playbackFinished(); + } + } } diff --git a/src/main/java/com/example/awslextest/LexBidirectionalStreamingExample.java b/src/main/java/com/example/awslextest/LexBidirectionalStreamingExample.java index a20d9ec..944a3b0 100644 --- a/src/main/java/com/example/awslextest/LexBidirectionalStreamingExample.java +++ b/src/main/java/com/example/awslextest/LexBidirectionalStreamingExample.java @@ -1,2 +1,96 @@ -package com.example.awslextest;public class LexBidirectionalStreamingExample { +package com.example.awslextest; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.lexruntimev2.LexRuntimeV2AsyncClient; +import software.amazon.awssdk.services.lexruntimev2.model.ConversationMode; +import software.amazon.awssdk.services.lexruntimev2.model.StartConversationRequest; + +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; + +import java.net.URISyntaxException; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +public class LexBidirectionalStreamingExample { + public static void main(String[] args) throws URISyntaxException, InterruptedException { + String botId = System.getenv("AWS_LEX_BOT_ID "); + String botAliasId = System.getenv("AWS_LEX_BOT_ALIAS_ID"); + String localeId = "en_US"; + String accessKey = System.getenv("AWS_ACCESS_KEY_ID"); + String secretKey = System.getenv("AWS_SECRET_KEY"); + String sessionId = UUID.randomUUID().toString(); + Region region = Region.US_EAST_1; // Choose an AWS Region where the Amazon Lex Streaming API is available. + + AwsCredentialsProvider awsCredentialsProvider = StaticCredentialsProvider + .create(AwsBasicCredentials.create(accessKey, secretKey)); + + // Create a new SDK client. You need to use an asynchronous client. + System.out.println("step 1: creating a new Lex SDK client"); + LexRuntimeV2AsyncClient lexRuntimeServiceClient = LexRuntimeV2AsyncClient.builder() + .region(region) + .credentialsProvider(awsCredentialsProvider) + .build(); + + + // Configure the bot, alias and locale that you'll use to have a conversation. + System.out.println("step 2: configuring bot details"); + StartConversationRequest.Builder startConversationRequestBuilder = StartConversationRequest.builder() + .botId(botId) + .botAliasId(botAliasId) + .localeId(localeId); + + // Configure the conversation mode of the bot. By default, the + // conversation mode is audio. + System.out.println("step 3: choosing conversation mode"); + startConversationRequestBuilder = startConversationRequestBuilder.conversationMode(ConversationMode.AUDIO); + + // Assign a unique identifier for the conversation. + System.out.println("step 4: choosing a unique conversation identifier"); + startConversationRequestBuilder = startConversationRequestBuilder.sessionId(sessionId); + + // Start the initial request. + StartConversationRequest startConversationRequest = startConversationRequestBuilder.build(); + + // Create a stream of audio data to the Amazon Lex bot. The stream will start after the connection is established with the bot. + EventsPublisher eventsPublisher = new EventsPublisher(); + + // Create a class to handle responses from bot. After the server processes the user data you've streamed, the server responds + // on another stream. + BotResponseHandler botResponseHandler = new BotResponseHandler(eventsPublisher,sessionId); + + // Start a connection and pass in the publisher that streams the audio and process the responses from the bot. + System.out.println("step 5: starting the conversation ..."); + CompletableFuture conversation = lexRuntimeServiceClient.startConversation( + startConversationRequest, + eventsPublisher, + botResponseHandler + ); + + // Wait until the conversation finishes. The conversation finishes if the dialog state reaches the "Closed" state. + // The client stops the connection. If an exception occurs during the conversation, the + // client sends a disconnection event. + conversation.whenComplete((result, exception) -> { + if (exception != null) { + eventsPublisher.disconnect(); + } + }); + + // The conversation finishes when the dialog state is closed and last prompt has been played. + while (!botResponseHandler.isConversationComplete()) { + Thread.sleep(100); + } + + // Randomly sleep for 100 milliseconds to prevent JVM from exiting. + // You won't need this in your production code because your JVM is + // likely to always run. + // When the conversation finishes, the following code block stops publishing more data and informs the Amazon Lex bot that there is no more data to send. + if (botResponseHandler.isConversationComplete()) { + System.out.println("conversation is complete."); + eventsPublisher.stop(); + } + } } diff --git a/src/main/java/com/example/awslextest/SoundRecorder.java b/src/main/java/com/example/awslextest/SoundRecorder.java index cbbcaff..7d542da 100644 --- a/src/main/java/com/example/awslextest/SoundRecorder.java +++ b/src/main/java/com/example/awslextest/SoundRecorder.java @@ -1,2 +1,197 @@ -package com.example.awslextest;public class SoundRecorder { +package com.example.awslextest; +import javax.sound.sampled.*; +import java.io.*; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Random; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.waiters.WaiterResponse; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.waiters.S3Waiter; +import software.amazon.awssdk.services.s3.model.HeadBucketRequest; +import software.amazon.awssdk.services.s3.model.HeadBucketResponse; +/* + Define an audio format of the sound source to be captured, using the class AudioFormat. + Create a DataLine.Info object to hold information of a data line. + Obtain a TargetDataLine object which represents an input data line from which audio data can be captured, using the method getLineInfo(DataLine.Info) of the AudioSystem class. + Open and start the target data line to begin capturing audio data. + Create an AudioInputStream object to read data from the target data line. + Record the captured sound into a WAV file using the following method of the class AudioSystem: + write(AudioInputStream, AudioFileFormat.Type, File) + Note that this method blocks the current thread until the target data line is closed. + + Stop and close the target data line to end capturing and recording. + + */ +public class SoundRecorder { + // record duration, in milliseconds + static final long RECORD_TIME = 10000; // 1 minute + + // path of the wav file + private File wavFile; + + // format of audio file + AudioFileFormat.Type fileType = AudioFileFormat.Type.WAVE; + + // the line from which audio data is captured + TargetDataLine line; + AudioFormat format; + private String sessionId; + + private S3Client s3; + private static final Region region = Region.AWS_GLOBAL; + private String bucketName = "demo-bucket-lex"; + +// private ArrayList completedParts; +// private int completedPartsCount; +// private String uploadId; + + + + SoundRecorder(String sessionId){ + + this.wavFile = new File(String.format("voice_sig_files/%s.wav",sessionId)); + this.sessionId = sessionId; + + } + /** + * Defines an audio format + */ + AudioFormat getAudioFormat() { + float sampleRate = 8000; + int sampleSizeInBits = 16; + int channels = 1; + boolean signed = true; + boolean bigEndian = false; + AudioFormat format = new AudioFormat(sampleRate, sampleSizeInBits, + channels, signed, bigEndian); + return format; + } + + File getWavFile(){ + return this.wavFile; + } + void bootUp(){ + try { + this.format = getAudioFormat(); + DataLine.Info info = new DataLine.Info(TargetDataLine.class, this.format); + + // checks if system supports the data line + if (!AudioSystem.isLineSupported(info)) { + System.out.println("Line not supported"); + System.exit(0); + } + this.line = (TargetDataLine) AudioSystem.getLine(info); + } catch (LineUnavailableException ex) { + ex.printStackTrace(); + } + } + + /** + * Captures the sound and record into a WAV file + */ + void start() { + try { + this.line.open(this.format); + this.line.start(); // start capturing + + System.out.println("Start capturing user's voice signature..."); + + AudioInputStream ais = new AudioInputStream(this.line); + + System.out.println("Start recording user's voice signature..."); + + // start recording + AudioSystem.write(ais, fileType, wavFile); + + + } catch (IOException ioe) { + ioe.printStackTrace(); + } + catch (LineUnavailableException ex) { + ex.printStackTrace(); + } + } + + /** + * Closes the target data line to finish capturing and recording + */ + void finish() { + //line.stop(); + line.close(); + System.out.println("Not recording user's voice signature currently."); + } + + + + private static ByteBuffer getRandomByteBuffer(int size) throws IOException { + byte[] b = new byte[size]; + new Random().nextBytes(b); + return ByteBuffer.wrap(b); + } + + /** + * Entry to run the program + */ + public static void main(String[] args) { + SoundRecorder recorder = new SoundRecorder("4"); + + Thread stopper = new Thread(new Runnable() { + public void run() { + try { + Thread.sleep(RECORD_TIME); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + recorder.finish(); + } + }); + + stopper.start(); + + // start recording + recorder.bootUp(); + recorder.start(); + // creates a new thread that waits for a specified + // of time before stopping +// Thread stopper = new Thread(new Runnable() { +// public void run() { +// try { +// Thread.sleep(RECORD_TIME); +// } catch (InterruptedException ex) { +// ex.printStackTrace(); +// } +// recorder.finish(); +// } +// }); +// +// stopper.start(); +// +// // start recording +// recorder.bootUp(); +// recorder.start(); + } }