package com.amazonaws.fixengineonaws;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.globalaccelerator.AWSGlobalAccelerator;
import com.amazonaws.services.globalaccelerator.AWSGlobalAcceleratorClientBuilder;
import com.amazonaws.services.globalaccelerator.model.DescribeEndpointGroupRequest;
import com.amazonaws.services.globalaccelerator.model.EndpointConfiguration;
import com.amazonaws.services.globalaccelerator.model.EndpointDescription;
import com.amazonaws.services.globalaccelerator.model.UpdateEndpointGroupRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import quickfix.Acceptor;
import quickfix.Application;
import quickfix.ConfigError;
import quickfix.DefaultMessageFactory;
import quickfix.DoNotSend;
import quickfix.FieldNotFound;
import quickfix.FileStoreFactory;
import quickfix.IncorrectDataFormat;
import quickfix.IncorrectTagValue;
import quickfix.Initiator;
import quickfix.InvalidMessage;
import quickfix.JdbcStoreFactory;
import quickfix.Message;
import quickfix.MessageUtils;
import quickfix.RejectLogon;
import quickfix.ScreenLogFactory;
import quickfix.Session;
import quickfix.SessionID;
import quickfix.SessionNotFound;
import quickfix.SocketAcceptor;
import quickfix.SocketInitiator;
import quickfix.UnsupportedMessageType;

/* loaded from: input_file:com/amazonaws/fixengineonaws/FixEngine.class */
public class FixEngine implements Application {
    private String MY_IP;
    private String KAFKA_INBOUND_TOPIC_NAME;
    private FixEngineConfig fixEngineConfig;
    private Logger LOGGER = Logger.getLogger(FixEngine.class.getName());
    private boolean IM_AM_THE_ACTIVE_ENGINE = false;
    private KafkaProducer<String, String> KAFKA_PRODUCER = null;
    private KafkaConsumer<String, Object> KAFKA_CONSUMER = null;
    private long messageCounter = 0;
    private Acceptor FIX_SERVER = null;
    private Initiator FIX_CLIENT = null;
    private Session FIX_SESSION = null;
    private SessionID FIX_SESSION_ID = null;
    private final int LEADER_STATUS_STILL_LEADER = 1;
    private final int LEADER_STATUS_STILL_NOT_LEADER = 0;
    private final int LEADER_STATUS_JUST_BECAME_LEADER = -1;
    private int HEARTBEAT_SLEEP_INTERVAL = 0;
    private boolean DROP_FIX_MESSAGES = false;
    private boolean DROP_KAFKA_MESSAGES = false;
    private long lastStatsLogTime = 0;
    private long logStatsEvery = 60000;
    private long totalInboundMessageProcessingTime = 0;
    private long totalInboundKafkaProcessingTime = 0;
    private long totalOutboundMessageProcessingTime = 0;
    private long totalOutboundFixProcessingTime = 0;

    public FixEngine(String str) throws ConfigError {
        this.MY_IP = "???";
        this.LOGGER.setLevel(Level.INFO);
        this.MY_IP = getMyIp();
        this.LOGGER.info(this.MY_IP + "CONSTRUCTOR: INITIALIZING CONFIG");
        try {
            this.fixEngineConfig = new FixEngineConfig(str, this.LOGGER);
        } catch (ConfigError e) {
            this.LOGGER.severe(this.MY_IP + "CONSTRUCTOR: ERROR INITIALIZING CONFIG DUE TO ERROR: " + e);
            e.printStackTrace();
            throw e;
        }
    }

    public void run() {
        this.LOGGER.info(this.MY_IP + "CONSTRUCTOR: STARTING HEARTBEAT");
        try {
            heartbeatMessageProcessingLoop(this.fixEngineConfig);
        } catch (ConfigError e) {
            this.LOGGER.severe(this.MY_IP + "CONSTRUCTOR: ERROR IN HEARTBEAT DUE TO CONFIG ERROR: " + e);
            e.printStackTrace();
        }
    }

    public void onCreate(SessionID sessionID) {
        this.LOGGER.fine(this.MY_IP + "OnCreate");
    }

    public void onLogon(SessionID sessionID) {
        this.LOGGER.info(this.MY_IP + "OnLogon session ID: " + sessionID);
        this.FIX_SESSION_ID = sessionID;
    }

    public void onLogout(SessionID sessionID) {
        this.LOGGER.info(this.MY_IP + "OnLogout session ID: " + sessionID);
        this.FIX_SESSION_ID = null;
    }

    public void toAdmin(Message message, SessionID sessionID) {
    }

    public void fromAdmin(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {
    }

    public void toApp(Message message, SessionID sessionID) throws DoNotSend {
        this.LOGGER.info(this.MY_IP + "%%%%%%%% TOAPP: " + message);
    }

    public void fromApp(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
        this.LOGGER.info(this.MY_IP + "%%%%%%%% FROMAPP: " + message);
        long currentTimeMillis = System.currentTimeMillis();
        if (this.DROP_FIX_MESSAGES) {
            this.LOGGER.severe(this.MY_IP + "%%%%%%%% FROMAPP: DROPPING MESSAGE INSTEAD OF SENDING IT!");
            return;
        }
        if (!this.IM_AM_THE_ACTIVE_ENGINE) {
            this.LOGGER.fine(this.MY_IP + "%%%%%%%% FROMAPP: NOT ACTIVE ENGINE, DO Nothing");
        }
        Logger logger = this.LOGGER;
        StringBuilder append = new StringBuilder().append(this.MY_IP).append("********************** counter: ");
        long j = this.messageCounter;
        this.messageCounter = j + 1;
        logger.fine(append.append(j).toString());
        String message2 = message.toString();
        this.LOGGER.fine(this.MY_IP + "%%%%%%%% FROMAPP: ***SERVER FIX ENGINE*** PARSED ORDER FIX STRING: " + message2);
        try {
            long currentTimeMillis2 = System.currentTimeMillis();
            if (currentTimeMillis2 - this.lastStatsLogTime > this.logStatsEvery) {
                this.LOGGER.info(this.MY_IP + "@@@@@@@@@@ INBOUND TIMING STATISTICS: RESETTING TOTALS SINCE IT'S BEEN OVER A MINUTE SINCE THE LAST MESSAGE");
                this.totalInboundKafkaProcessingTime = 0L;
                this.totalInboundMessageProcessingTime = 0L;
            }
            this.KAFKA_PRODUCER.send(new ProducerRecord(this.KAFKA_INBOUND_TOPIC_NAME, message2)).get();
            long currentTimeMillis3 = System.currentTimeMillis();
            this.totalInboundKafkaProcessingTime += currentTimeMillis3 - currentTimeMillis2;
            this.totalInboundMessageProcessingTime += currentTimeMillis3 - currentTimeMillis;
            this.LOGGER.info(this.MY_IP + "@@@@@@@@@@ INBOUND TIMING STATISTICS:\ttotalInboundKafkaProcessingTime:\t" + this.totalInboundKafkaProcessingTime + "\ttotalInboundMessageProcessingTime:\t" + this.totalInboundMessageProcessingTime);
        } catch (Exception e) {
            this.LOGGER.severe(this.MY_IP + "%%%%%%%% FROMAPP: Exception:" + e);
            e.printStackTrace();
        }
    }

    private String getMyIp() {
        try {
            return InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            this.LOGGER.severe(this.MY_IP + "ERROR: Unable to find my own IP address!" + e);
            e.printStackTrace();
            return "???.???.???.???";
        }
    }

    public void setLogLevel(boolean z) {
        Level level = z ? Level.FINE : Level.INFO;
        this.LOGGER.setLevel(level);
        this.LOGGER.info(this.MY_IP + "MAIN: SET LOG LEVEL TO " + level);
    }

    private void loadJdbcClass(String str) {
        try {
            Class.forName(str);
            this.LOGGER.fine(this.MY_IP + "LOADED JDBC DRIVER:" + str);
        } catch (ClassNotFoundException e) {
            this.LOGGER.severe(this.MY_IP + "UNABLE TO LOAD JDBC DRIVER:" + str);
            e.printStackTrace();
        }
    }

    private Connection getSqlDbConnection(String str, String str2, String str3) {
        this.LOGGER.info(this.MY_IP + "*********************GET SQL DB CONNECTION starting, using JDBC URL " + str + " WITH USER " + str2 + " AND PASSWORD WHICH IS A SECRET ");
        try {
            Connection connection = DriverManager.getConnection(str, str2, str3);
            this.LOGGER.fine(this.MY_IP + "****GET SQL DB CONNECTION: GOT SQL CONNECTION");
            if (connection == null) {
                return null;
            }
            this.LOGGER.info(this.MY_IP + "****GET SQL DB CONNECTION: Database connection established");
            return connection;
        } catch (Exception e) {
            this.LOGGER.severe(this.MY_IP + "****GET SQL DB CONNECTION: EXCEPTION: " + e);
            e.printStackTrace();
            return null;
        }
    }

    private KafkaProducer<String, String> startKafkaProducer(String str) {
        this.LOGGER.info(this.MY_IP + "****START KAFKA OUTBOUND PRODUCER START*****");
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str);
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<>(properties);
    }

    private KafkaConsumer<String, Object> startKafkaConsumer(String str, String str2, String str3) {
        this.LOGGER.info(this.MY_IP + "****KAFKA INBOUND CONSUMER START*****");
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str);
        properties.setProperty("group.id", str2);
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "earliest");
        KafkaConsumer<String, Object> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList(str3));
        return kafkaConsumer;
    }

    public Message parseOrder(String str) {
        try {
            Message parse = MessageUtils.parse(this.FIX_SESSION, str);
            this.LOGGER.info(this.MY_IP + "****PROCESS KAFKA MSGS: PARSED   MESSAGE: " + parse);
            return parse;
        } catch (InvalidMessage e) {
            this.LOGGER.severe(this.MY_IP + "ERROR PARSING MESSAGE: " + str);
            e.printStackTrace();
            return null;
        }
    }

    private void processOneInboundKafkaMessage(ConsumerRecord<String, Object> consumerRecord) {
        this.LOGGER.info(this.MY_IP + "****PROCESS ONE KAFKA MESSAGE: processing " + consumerRecord.value().toString());
        if (this.DROP_KAFKA_MESSAGES) {
            this.LOGGER.severe(this.MY_IP + "****PROCESS ONE KAFKA MESSAGE: DROPPING MESSAGE INSTEAD OF SENDING IT!");
            return;
        }
        Message parseOrder = parseOrder(consumerRecord.value().toString());
        try {
            this.LOGGER.info(this.MY_IP + "****PROCESS KAFKA MSGS: SENDING MESSAGE TO FIX: " + parseOrder);
            Session.sendToTarget(parseOrder, this.FIX_SESSION_ID);
        } catch (Exception e) {
            this.LOGGER.severe(this.MY_IP + "****PROCESS KAFKA MSGS: Exception: " + e);
            e.printStackTrace();
        } catch (SessionNotFound e2) {
            this.LOGGER.severe(this.MY_IP + "****PROCESS KAFKA MSGS: SessionNotFound: " + e2);
            e2.printStackTrace();
        }
    }

    private void processInboundKafkaMsgs(KafkaConsumer<String, Object> kafkaConsumer) {
        this.LOGGER.info(this.MY_IP + "****PROCESS KAFKA MSGS: ************* after calling getKafkaConsumer ");
        int i = 0;
        if (!this.IM_AM_THE_ACTIVE_ENGINE || this.FIX_SESSION_ID == null) {
            return;
        }
        ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(50L));
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.lastStatsLogTime > this.logStatsEvery) {
            this.LOGGER.info(this.MY_IP + "@@@@@@@@@@ OUTBOUND TIMING STATISTICS: RESETTING TOTALS SINCE IT'S BEEN OVER A MINUTE SINCE THE LAST MESSAGE");
            this.totalOutboundFixProcessingTime = 0L;
            this.totalOutboundMessageProcessingTime = 0L;
        }
        if (poll.count() == 0) {
            this.LOGGER.info(this.MY_IP + "****PROCESS KAFKA MSGS: nothing to read from Kafka");
            return;
        }
        this.LOGGER.info(this.MY_IP + "****PROCESS KAFKA MSGS: got some messages from Kafka");
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord<String, Object> consumerRecord = (ConsumerRecord) it.next();
            i++;
            this.LOGGER.fine(this.MY_IP + i + ": " + consumerRecord.value());
            long currentTimeMillis2 = System.currentTimeMillis();
            processOneInboundKafkaMessage(consumerRecord);
            long currentTimeMillis3 = System.currentTimeMillis();
            this.totalOutboundFixProcessingTime += currentTimeMillis3 - currentTimeMillis2;
            this.totalOutboundMessageProcessingTime += currentTimeMillis3 - currentTimeMillis;
            this.LOGGER.info(this.MY_IP + "@@@@@@@@@@ OUTBOUND TIMING STATISTICS:\tmessageCount:\t" + i + "\ttotalOutboundFixProcessingTime:\t" + this.totalOutboundFixProcessingTime + "\ttotalOutboundMessageProcessingTime:\t" + this.totalOutboundMessageProcessingTime);
        }
    }

    private void updateGAEndpoints(String str, String str2) {
        this.LOGGER.info(this.MY_IP + "UPDATE GA ENDPOINT starting for myGaEndpointGroupArn: " + str + " and myGaEndpointArn: " + str2);
        String str3 = null;
        String str4 = null;
        AWSGlobalAccelerator aWSGlobalAccelerator = (AWSGlobalAccelerator) AWSGlobalAcceleratorClientBuilder.standard().withRegion(Regions.US_WEST_2).build();
        List endpointDescriptions = aWSGlobalAccelerator.describeEndpointGroup(new DescribeEndpointGroupRequest().withEndpointGroupArn(str)).getEndpointGroup().getEndpointDescriptions();
        for (int i = 0; i < endpointDescriptions.size(); i++) {
            EndpointDescription endpointDescription = (EndpointDescription) endpointDescriptions.get(i);
            String endpointId = endpointDescription.getEndpointId();
            String healthState = endpointDescription.getHealthState();
            Integer weight = endpointDescription.getWeight();
            if (endpointId.equals(str2)) {
                str3 = endpointId;
            } else {
                str4 = endpointId;
            }
            this.LOGGER.info(this.MY_IP + "MY ENDPOINT: ID: " + endpointId + " HEALTH: " + healthState + " WEIGHT: " + weight);
        }
        this.LOGGER.info(this.MY_IP + "UPDATE GA ENDPOINT activeEndpoint: " + str3 + " passiveEndpoint: " + str4);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new EndpointConfiguration().withEndpointId(str3).withWeight(100));
        arrayList.add(new EndpointConfiguration().withEndpointId(str4).withWeight(0));
        this.LOGGER.info(this.MY_IP + "UPDATE GA ENDPOINT flipping to myGaEndpointArn: " + str2 + " with endpointConfiguration: " + arrayList);
        aWSGlobalAccelerator.updateEndpointGroup(new UpdateEndpointGroupRequest().withEndpointGroupArn(str).withEndpointConfigurations(arrayList));
    }

    private CallableStatement getHeartbeatSprocStmt(String str, String str2, String str3, String str4) {
        this.LOGGER.fine(this.MY_IP + "*********************GET HEARTBEAT PROC STATEMENT*********************");
        loadJdbcClass(str);
        this.LOGGER.fine(this.MY_IP + "*****GETHEARTBEATPROCSTATEMENT: Making SQL connection");
        Connection sqlDbConnection = getSqlDbConnection(str2, str3, str4);
        this.LOGGER.fine(this.MY_IP + "*****GETHEARTBEATPROCSTATEMENT: connected to SQL DB");
        try {
            CallableStatement prepareCall = sqlDbConnection.prepareCall("{CALL EngineStatus(?, ?, ?, ?, ?, ?)}");
            prepareCall.setString(1, this.MY_IP);
            prepareCall.registerOutParameter(2, 4);
            prepareCall.registerOutParameter(3, 12);
            prepareCall.registerOutParameter(4, 93);
            prepareCall.registerOutParameter(5, 93);
            prepareCall.registerOutParameter(6, 4);
            this.LOGGER.fine(this.MY_IP + "****GETHEARTBEATPROCSTATEMENT: SPROC PREPARED STATEMENT CREATED");
            return prepareCall;
        } catch (SQLException e) {
            this.LOGGER.severe(this.MY_IP + "****GET HEARTBEAT PROC STATEMENT: EXCEPTION: " + e);
            e.printStackTrace();
            return null;
        }
    }

    private int getLeaderStatus(CallableStatement callableStatement, boolean z, boolean z2) throws SQLException {
        int i;
        if (z2) {
            try {
                callableStatement.executeQuery();
                i = callableStatement.getInt(2);
                this.LOGGER.info(this.MY_IP + "****HEARTBEAT: SQL SPROC SAYS: leaderStatus: " + i + "; lastIpAdd: " + callableStatement.getString(3) + "; lastTimestamp: " + callableStatement.getTimestamp(4) + "; timeNow: " + callableStatement.getTimestamp(5) + "; timeDiffSec: " + callableStatement.getInt(6));
            } catch (SQLException e) {
                this.LOGGER.severe(this.MY_IP + "HEARTBEAT: Exception executing SQL SPROC: " + e);
                e.printStackTrace();
                throw e;
            }
        } else {
            i = z ? 1 : -1;
            this.LOGGER.info(this.MY_IP + "****HEARTBEAT: NO SQL CONNECTION. DEFAULT LEADER STATUS: " + i);
        }
        return i;
    }

    private void startFixServer(FixEngineConfig fixEngineConfig) throws ConfigError {
        this.LOGGER.info(this.MY_IP + "****STARTING FIX SERVER APPLICATION");
        if (this.FIX_SERVER != null) {
            this.LOGGER.info(this.MY_IP + "START FIX SERVER: FIX_SERVER object already exists!");
        } else {
            this.FIX_SERVER = new SocketAcceptor(this, "true".equals(fixEngineConfig.getSessionSetting("UseJdbcMessageStore")) ? new JdbcStoreFactory(fixEngineConfig.getSessionSettings()) : new FileStoreFactory(fixEngineConfig.getSessionSettings()), fixEngineConfig.getSessionSettings(), new ScreenLogFactory(true, true, true), new DefaultMessageFactory());
            this.LOGGER.info(this.MY_IP + "START FIX SERVER: FIX_SERVER object created: " + this.FIX_SERVER);
        }
        if (this.FIX_SESSION != null) {
            this.LOGGER.info(this.MY_IP + "START FIX SERVER: FIX_INBOUND_SESSION object already exists!");
            return;
        }
        this.FIX_SERVER.start();
        this.FIX_SESSION = Session.lookupSession((SessionID) this.FIX_SERVER.getSessions().get(0));
        this.LOGGER.info(this.MY_IP + "START FIX SERVER: FIX_INBOUND_SESSION object created: " + this.FIX_SESSION);
    }

    private void startFixClient(FixEngineConfig fixEngineConfig) throws ConfigError {
        this.LOGGER.info(this.MY_IP + "****STARTING FIX CLIENT APPLICATION");
        if (this.FIX_CLIENT != null) {
            this.LOGGER.info(this.MY_IP + "START FIX CLIENT: FIX_CLIENT object already exists!");
        } else {
            this.FIX_CLIENT = new SocketInitiator(this, "true".equals(fixEngineConfig.getSessionSetting("UseJdbcMessageStore")) ? new JdbcStoreFactory(fixEngineConfig.getSessionSettings()) : new FileStoreFactory(fixEngineConfig.getSessionSettings()), fixEngineConfig.getSessionSettings(), new ScreenLogFactory(true, true, true), new DefaultMessageFactory());
            this.LOGGER.info(this.MY_IP + "START FIX CLIENT: FIX_CLIENT object created: " + this.FIX_CLIENT);
        }
        if (this.FIX_SESSION_ID != null) {
            this.LOGGER.info(this.MY_IP + "START FIX CLIENT: FIX_OUTBOUND_SESSION_ID object already exists!");
            return;
        }
        this.FIX_CLIENT.start();
        this.FIX_SESSION = Session.lookupSession((SessionID) this.FIX_CLIENT.getSessions().get(0));
        this.LOGGER.info(this.MY_IP + "START FIX CLIENT: FIX_OUTBOUND_SESSION object created: " + this.FIX_SESSION);
        while (this.FIX_SESSION_ID == null) {
            this.LOGGER.info(this.MY_IP + "****QUICKFIX CLIENT START: WAITING FOR SERVER...");
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                this.LOGGER.info(this.MY_IP + "****QUICKFIX CLIENT START: FixEngine THREAD INTERRUPTED: " + e);
            }
        }
        this.LOGGER.info(this.MY_IP + "START FIX CLIENT: FIX_OUTBOUND_SESSION_ID connection established: " + this.FIX_SESSION_ID);
    }

    private void stopFixServer() {
        this.LOGGER.info(this.MY_IP + "****STOPPING FIX SERVER APPLICATION");
        this.FIX_SERVER.stop();
        this.FIX_SESSION = null;
        this.FIX_SERVER = null;
    }

    private void stopFixClient() {
        this.LOGGER.info(this.MY_IP + "****STOPPING FIX CLIENT APPLICATION");
        if (this.FIX_CLIENT != null) {
            this.FIX_CLIENT.stop();
        }
        this.FIX_SESSION_ID = null;
        this.FIX_CLIENT = null;
        this.FIX_SESSION = null;
    }

    private void heartbeatMessageProcessingLoop(FixEngineConfig fixEngineConfig) throws ConfigError {
        this.LOGGER.info(this.MY_IP + "*****START HEARTBEAT *****");
        boolean equals = "initiator".equals(fixEngineConfig.getSessionSetting("ConnectionType"));
        String sessionSetting = fixEngineConfig.getSessionSetting("kafkaBootstrapBrokerString");
        String sessionSetting2 = fixEngineConfig.getSessionSetting("KafkaConsumerGroupID");
        String sessionSetting3 = fixEngineConfig.getSessionSetting("KafkaOutboundTopicName");
        this.KAFKA_INBOUND_TOPIC_NAME = fixEngineConfig.getSessionSetting("KafkaInboundTopicName");
        String sessionSetting4 = equals ? null : fixEngineConfig.getSessionSetting("GAEndpointGroupArn");
        String sessionSetting5 = equals ? null : fixEngineConfig.getSessionSetting("GAEndpointArn");
        boolean equals2 = "true".equals(fixEngineConfig.getSessionSetting("UseJdbcHeartbeat"));
        CallableStatement callableStatement = null;
        if (equals2) {
            fixEngineConfig.addSqlDbConnectionCoordinatesToSettings(fixEngineConfig.getSessionSetting("RDSClusterSecretArn"));
            callableStatement = getHeartbeatSprocStmt(fixEngineConfig.getSessionSetting("JdbcDriver"), fixEngineConfig.getSessionSetting("JdbcURL"), fixEngineConfig.getSessionSetting("JdbcUser"), fixEngineConfig.getSessionSetting("JdbcPassword"));
        }
        if (!equals) {
            startFixServer(fixEngineConfig);
        }
        while (true) {
            this.LOGGER.info(this.MY_IP + "**************** HEARTBEAT: iAmClientFixEngine: " + equals + " ; IM_AM_THE_ACTIVE_ENGINE: " + this.IM_AM_THE_ACTIVE_ENGINE);
            int i = 0;
            try {
                i = getLeaderStatus(callableStatement, this.IM_AM_THE_ACTIVE_ENGINE, equals2);
            } catch (SQLException e) {
                this.LOGGER.severe(this.MY_IP + "****HEARTBEAT: ***ERROR GETTING LEADER STATUS!*** " + e);
                e.printStackTrace();
                this.LOGGER.severe(this.MY_IP + "****HEARTBEAT: ***RECREATING HEARTBEAT CONNECTION TO ATTEMPT TO RECOVER!***");
                callableStatement = getHeartbeatSprocStmt(fixEngineConfig.getSessionSetting("JdbcDriver"), fixEngineConfig.getSessionSetting("JdbcURL"), fixEngineConfig.getSessionSetting("JdbcUser"), fixEngineConfig.getSessionSetting("JdbcPassword"));
            }
            this.LOGGER.info(this.MY_IP + "**************** HEARTBEAT: iAmClientFixEngine: " + equals + " ; IM_AM_THE_ACTIVE_ENGINE: " + this.IM_AM_THE_ACTIVE_ENGINE + " ; leaderStatus: " + i);
            if (i == -1) {
                this.LOGGER.info(this.MY_IP + "****HEARTBEAT: ***I'M STILL LEADER OR JUST BECAME LEADER! ENSURING ENGINES ARE RUNNING!***");
                if (this.KAFKA_CONSUMER == null) {
                    this.KAFKA_CONSUMER = startKafkaConsumer(sessionSetting, sessionSetting2, sessionSetting3);
                }
                if (this.KAFKA_PRODUCER == null) {
                    this.KAFKA_PRODUCER = startKafkaProducer(sessionSetting);
                }
                if (equals) {
                    startFixClient(fixEngineConfig);
                } else {
                    this.LOGGER.info(this.MY_IP + "**************** HEARTBEAT: I AM Server ENGINE***********");
                    startFixServer(fixEngineConfig);
                    if ("10.130.0.66".equals(this.MY_IP)) {
                        this.LOGGER.severe(this.MY_IP + "**************** HEARTBEAT: NOT UPDATING GLOBAL ACCELERATOR ENDPOINT BECAUSE WE DONT HAVE ACCESS FROM THIS MACHINE!!!***********");
                    } else {
                        updateGAEndpoints(sessionSetting4, sessionSetting5);
                    }
                }
                this.IM_AM_THE_ACTIVE_ENGINE = true;
            } else if (i == 1) {
                this.LOGGER.info(this.MY_IP + "****HEARTBEAT: ***STILL LEADER! Keep listening!***");
            } else if (i == 0) {
                this.LOGGER.info(this.MY_IP + "****HEARTBEAT: ***STILL NOT LEADER!***");
                this.KAFKA_CONSUMER = null;
                this.KAFKA_PRODUCER = null;
                stopFixClient();
            }
            processInboundKafkaMsgs(this.KAFKA_CONSUMER);
            if (this.HEARTBEAT_SLEEP_INTERVAL > 0) {
                try {
                    Thread.sleep(this.HEARTBEAT_SLEEP_INTERVAL);
                } catch (InterruptedException e2) {
                    this.LOGGER.severe(this.MY_IP + "HEARTBEAT THREAD INTERRUPTED: " + e2);
                }
            }
        }
    }

    public static void main(String[] strArr) throws ConfigError {
        String str = strArr.length > 0 ? strArr[0] : "config/server_test.cfg";
        System.out.println("***MAIN STARTING WITH CONFIG FILE: " + str);
        new FixEngine(str).run();
        try {
            new CountDownLatch(1).await();
        } catch (InterruptedException e) {
            System.out.println("MAIN: FINAL WAIT INTERRUPTED: " + e);
            e.printStackTrace();
        }
        System.out.println("MAIN: GOT TO THE END! EXITING!");
    }
}
