package com.google.devtools.mobileharness.infra.controller.messaging;

import com.google.common.base.Preconditions;
import com.google.common.flogger.FluentLogger;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.devtools.mobileharness.api.messaging.MessageDestinationNotFoundException;
import com.google.devtools.mobileharness.api.messaging.proto.MessagingProto;
import com.google.devtools.mobileharness.shared.util.base.ProtoTextFormat;
import com.google.devtools.mobileharness.shared.util.concurrent.Callables;
import com.google.devtools.mobileharness.shared.util.concurrent.MoreFutures;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import javax.inject.Inject;

/* loaded from: input_file:com/google/devtools/mobileharness/infra/controller/messaging/MessagingManager.class */
public class MessagingManager {
    private static final FluentLogger logger = FluentLogger.forEnclosingClass();
    private final MessageSenderFinder messageSenderFinder;
    private final ListeningExecutorService threadPool;

    /* loaded from: input_file:com/google/devtools/mobileharness/infra/controller/messaging/MessagingManager$MessageReceptionsConsumer.class */
    private static class MessageReceptionsConsumer implements Callable<Void> {
        private final String messageId;
        private final Consumer<MessagingProto.MessageReceptions> messageReceptionsHandler;
        private final BlockingQueue<Optional<MessagingProto.MessageReceptions>> queue;

        private MessageReceptionsConsumer(String str, Consumer<MessagingProto.MessageReceptions> consumer, BlockingQueue<Optional<MessagingProto.MessageReceptions>> blockingQueue) {
            this.messageId = str;
            this.messageReceptionsHandler = consumer;
            this.queue = blockingQueue;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws InterruptedException {
            boolean z;
            MessagingProto.MessageReceptions build;
            do {
                Optional<MessagingProto.MessageReceptions> take = this.queue.take();
                if (take.isEmpty()) {
                    break;
                }
                MessagingProto.MessageReceptions messageReceptions = take.get();
                z = false;
                ArrayList arrayList = null;
                while (true) {
                    Optional<MessagingProto.MessageReceptions> poll = this.queue.poll();
                    if (poll == null) {
                        break;
                    }
                    if (poll.isEmpty()) {
                        z = true;
                    } else {
                        if (arrayList == null) {
                            arrayList = new ArrayList();
                        }
                        arrayList.add(poll.get());
                    }
                }
                if (arrayList == null) {
                    build = messageReceptions;
                } else {
                    MessagingProto.MessageReceptions.Builder addAllReceptions = MessagingProto.MessageReceptions.newBuilder().addAllReceptions(messageReceptions.getReceptionsList());
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        addAllReceptions.addAllReceptions(((MessagingProto.MessageReceptions) it.next()).getReceptionsList());
                    }
                    build = addAllReceptions.build();
                }
                try {
                    this.messageReceptionsHandler.accept(build);
                } catch (Error | RuntimeException e) {
                    MessagingManager.logger.atInfo().withCause(e).log("Error when handling message receptions, message_id=[%s], message_receptions=[%s]", this.messageId, ProtoTextFormat.shortDebugString(build));
                }
            } while (!z);
            MessagingManager.logger.atInfo().log("Finished consuming message receptions, message_id=[%s]", this.messageId);
            return null;
        }
    }

    /* loaded from: input_file:com/google/devtools/mobileharness/infra/controller/messaging/MessagingManager$MessageReceptionsProducer.class */
    private static class MessageReceptionsProducer implements Callable<Void> {
        private final String messageId;
        private final MessagingProto.MessageSend messageSend;
        private final MessageSender messageSender;
        private final BlockingQueue<Optional<MessagingProto.MessageReceptions>> queue;

        private MessageReceptionsProducer(String str, MessagingProto.MessageSend messageSend, MessageSender messageSender, BlockingQueue<Optional<MessagingProto.MessageReceptions>> blockingQueue) {
            this.messageId = str;
            this.messageSend = messageSend;
            this.messageSender = messageSender;
            this.queue = blockingQueue;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws InterruptedException {
            try {
                this.messageSender.sendMessage(this.messageSend, messageReceptions -> {
                    this.queue.put(Optional.of(messageReceptions));
                });
                MessagingManager.logger.atInfo().log("Finished sending message, message_id=[%s]", this.messageId);
                boolean interrupted = Thread.interrupted();
                this.queue.put(Optional.empty());
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
                return null;
            } catch (Throwable th) {
                boolean interrupted2 = Thread.interrupted();
                this.queue.put(Optional.empty());
                if (interrupted2) {
                    Thread.currentThread().interrupt();
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Inject
    public MessagingManager(MessageSenderFinder messageSenderFinder, ListeningExecutorService listeningExecutorService) {
        this.messageSenderFinder = messageSenderFinder;
        this.threadPool = listeningExecutorService;
    }

    public void sendMessage(MessagingProto.MessageSend messageSend, Consumer<MessagingProto.MessageReceptions> consumer) throws MessageDestinationNotFoundException {
        Preconditions.checkNotNull(messageSend);
        Preconditions.checkNotNull(consumer);
        MessageSender findMessageSender = this.messageSenderFinder.findMessageSender(messageSend);
        String uuid = UUID.randomUUID().toString();
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        MessageReceptionsProducer messageReceptionsProducer = new MessageReceptionsProducer(uuid, messageSend, findMessageSender, linkedBlockingDeque);
        MessageReceptionsConsumer messageReceptionsConsumer = new MessageReceptionsConsumer(uuid, consumer, linkedBlockingDeque);
        logger.atInfo().log("Send message, message_id=[%s], message_send=[%s]", uuid, ProtoTextFormat.shortDebugString(messageSend));
        MoreFutures.logFailure(this.threadPool.submit(Callables.threadRenaming(messageReceptionsProducer, (Supplier<String>) () -> {
            return "message-sender-" + uuid;
        })), Level.WARNING, "Fatal error in message sender [%s]", uuid);
        MoreFutures.logFailure(this.threadPool.submit(Callables.threadRenaming(messageReceptionsConsumer, (Supplier<String>) () -> {
            return "message-receptions-consumer-" + uuid;
        })), Level.WARNING, "Fatal error in message receptions consumer [%s]", uuid);
    }
}
