package net.rossinno.saymon.agent.connection;

import com.google.common.base.Optional;
import com.google.common.eventbus.EventBus;
import com.google.gson.Gson;
import com.google.gson.JsonParseException;
import com.google.gson.reflect.TypeToken;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.annotation.PreDestroy;
import net.rossinno.saymon.agent.dto.AgentTaskDto;
import net.rossinno.saymon.agent.dto.ServerMessageDto;
import net.rossinno.saymon.agent.dto.UpdateInformationDto;
import net.rossinno.saymon.agent.event.AgentTasksEvent;
import net.rossinno.saymon.agent.event.AgentUpdateEvent;
import net.rossinno.saymon.agent.event.ConfigurationChangeEvent;
import net.rossinno.saymon.agent.event.DiscoveryAckEvent;
import net.rossinno.saymon.agent.event.SensorErrorEvent;
import net.rossinno.saymon.agent.lang.InClosure;
import net.rossinno.saymon.agent.task.AgentTask;
import net.rossinno.saymon.agent.task.TaskValidationResult;
import net.rossinno.saymon.agent.update.UpdateInformation;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:net/rossinno/saymon/agent/connection/KafkaAgentEventListener.class */
public class KafkaAgentEventListener implements EventSource {
    private static final String AGENT_UPDATE_TOPIC = "AGENT_UPDATE";
    private static final String AGENT_TASKS_TOPIC_PREFIX = "AGENT_TASKS_";
    private static final String AGENT_EVENTS_TOPIC_PREFIX = "AGENT_EVENTS_";
    private static final String AGENT_GET_TASKS_TOPIC = "AGENT_GET_TASKS";
    private static final String AGENT_GET_UPDATES_TOPIC = "AGENT_GET_UPDATES";
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private AgentIdDataSource agentIdSource;

    @Autowired
    private Gson gson;

    @Autowired
    private EventBus eventBus;

    @Autowired
    private MessageProducerFactory messageProducerFactory;
    private MessageProducer producer;

    @Autowired
    private MessageConsumerFactory messageConsumerFactory;
    private MessageConsumer consumer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/rossinno/saymon/agent/connection/KafkaAgentEventListener$AgentPollMessage.class */
    public static class AgentPollMessage {
        public final String agentId;

        AgentPollMessage(String str) {
            this.agentId = str;
        }
    }

    @PreDestroy
    private void preDestroy() {
        if (this.consumer != null) {
            this.consumer.stopListening();
        }
    }

    @Override // net.rossinno.saymon.agent.connection.EventSource
    public void start() {
        String checkedGet = this.agentIdSource.getAgentId().checkedGet();
        this.producer = this.messageProducerFactory.getDefaultProducer();
        this.consumer = this.messageConsumerFactory.getConsumer("Agent-" + checkedGet, MessageConsumerUnreadPolicy.IGNORE);
        runEventLoop();
        loadInitialUpdateInformation();
        loadInitialTasks();
    }

    private void loadInitialUpdateInformation() {
        Optional<String> fetchLastMessage = this.consumer.fetchLastMessage(AGENT_UPDATE_TOPIC);
        if (fetchLastMessage.isPresent()) {
            handleAgentUpdate(fetchLastMessage.get());
        } else {
            this.producer.send(AGENT_GET_UPDATES_TOPIC, new AgentPollMessage(this.agentIdSource.getAgentId().checkedGet()));
        }
    }

    private void loadInitialTasks() {
        String checkedGet = this.agentIdSource.getAgentId().checkedGet();
        Optional<String> fetchLastMessage = this.consumer.fetchLastMessage(AGENT_TASKS_TOPIC_PREFIX + checkedGet);
        if (fetchLastMessage.isPresent()) {
            this.logger.info("==> Fetched initial task set: {}", fetchLastMessage.get());
            handleAgentTasks(fetchLastMessage.get());
        } else {
            this.logger.info("No initial task set, polling for tasks.");
            this.producer.send(AGENT_GET_TASKS_TOPIC, new AgentPollMessage(checkedGet));
        }
    }

    private void runEventLoop() {
        String checkedGet = this.agentIdSource.getAgentId().checkedGet();
        this.consumer.listen(Arrays.asList(AGENT_UPDATE_TOPIC, AGENT_TASKS_TOPIC_PREFIX + checkedGet, AGENT_EVENTS_TOPIC_PREFIX + checkedGet), new InClosure<ServerMessage>() { // from class: net.rossinno.saymon.agent.connection.KafkaAgentEventListener.1
            @Override // net.rossinno.saymon.agent.lang.InClosure
            public void apply0(ServerMessage serverMessage) {
                if (KafkaAgentEventListener.AGENT_UPDATE_TOPIC.equals(serverMessage.getTopic())) {
                    KafkaAgentEventListener.this.handleAgentUpdate(serverMessage.getBody());
                    return;
                }
                if (serverMessage.getTopic().startsWith(KafkaAgentEventListener.AGENT_TASKS_TOPIC_PREFIX)) {
                    KafkaAgentEventListener.this.handleAgentTasks(serverMessage.getBody());
                } else if (serverMessage.getTopic().startsWith(KafkaAgentEventListener.AGENT_EVENTS_TOPIC_PREFIX)) {
                    KafkaAgentEventListener.this.handleAgentEvent(serverMessage.getBody());
                } else {
                    KafkaAgentEventListener.this.logger.warn("Unknown/unsupported message: {}", serverMessage);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleAgentUpdate(String str) {
        try {
            this.eventBus.post(new AgentUpdateEvent(UpdateInformation.valueOf((UpdateInformationDto) this.gson.fromJson(str, UpdateInformationDto.class))));
        } catch (JsonParseException e) {
            this.logger.error("Could not parse agent update information JSON {}: {}", str, ExceptionUtils.getRootCauseMessage(e));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleAgentTasks(String str) {
        try {
            List<AgentTaskDto> list = (List) this.gson.fromJson(str, new TypeToken<ArrayList<AgentTaskDto>>() { // from class: net.rossinno.saymon.agent.connection.KafkaAgentEventListener.2
            }.getType());
            ArrayList arrayList = new ArrayList(list.size());
            for (AgentTaskDto agentTaskDto : list) {
                try {
                    AgentTask from = AgentTask.from(agentTaskDto);
                    TaskValidationResult validate = from.validate();
                    if (validate.isOk()) {
                        arrayList.add(from);
                    } else {
                        this.logger.error("Agent task validation failed, task={}, error={}", agentTaskDto, validate.getErrorMessage());
                        this.eventBus.post(new SensorErrorEvent(from, new IllegalArgumentException(validate.getErrorMessage())));
                    }
                } catch (Exception e) {
                    this.logger.error("Failed to parse agent task, task=" + agentTaskDto + ", error=", (Throwable) e);
                }
            }
            this.eventBus.post(new AgentTasksEvent(arrayList));
        } catch (JsonParseException e2) {
            this.logger.error("Could not parse agent tasks JSON {}: {}", str, ExceptionUtils.getRootCauseMessage(e2));
        } catch (IllegalArgumentException e3) {
            this.logger.error("Agent task JSON validation failed {}: {}", str, ExceptionUtils.getRootCauseMessage(e3));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleAgentEvent(String str) {
        try {
            ServerMessageDto serverMessageDto = (ServerMessageDto) this.gson.fromJson(str, ServerMessageDto.class);
            if ("config-change".equals(serverMessageDto.getType())) {
                this.eventBus.post(new ConfigurationChangeEvent(serverMessageDto.getBody()));
            } else if ("discovery-ack".equals(serverMessageDto.getType())) {
                String str2 = serverMessageDto.getBody().get("discoveryId");
                if (str2 == null) {
                } else {
                    this.eventBus.post(new DiscoveryAckEvent(str2));
                }
            }
        } catch (JsonParseException e) {
            this.logger.error("Could not parse agent event JSON {}: {}", str, ExceptionUtils.getRootCauseMessage(e));
        }
    }
}
