package org.openimaj.rdf.storm.tool.monitor;

import backtype.storm.Config;
import backtype.storm.spout.AckStats;
import backtype.storm.spout.KestrelThriftClient;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import net.lag.kestrel.thrift.Item;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.MemcachedClient;
import org.apache.log4j.Logger;
import org.apache.thrift7.TException;
import org.openimaj.kestrel.KestrelServerSpec;
import org.openimaj.rdf.storm.tool.ReteStormOptions;
import org.openimaj.time.Timer;

/* loaded from: input_file:org/openimaj/rdf/storm/tool/monitor/KestrelQueueStatsMonitorMode.class */
public class KestrelQueueStatsMonitorMode extends MonitorMode {
    private String inputQueue;
    private String outputQueue;
    private MemcachedClient client;
    private Timer timer;
    private PrintWriter monitorWriter;
    private String ackQueue;
    private Iterator<KestrelThriftClient> thriftClientIterator;
    private boolean forceShutDown;
    private static final Logger logger = Logger.getLogger(KestrelQueueStatsMonitorMode.class);
    private static final Gson gson = new GsonBuilder().create();
    String queueFormatString = "queue_%s_%s";
    private AckStats recentAckStats = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/openimaj/rdf/storm/tool/monitor/KestrelQueueStatsMonitorMode$MemcachedStats.class */
    public enum MemcachedStats {
        age,
        bytes,
        canceled_transactions,
        discarded,
        expired_items,
        items,
        logsize,
        mem_bytes,
        mem_items,
        open_transactions,
        total_flushes,
        total_items,
        transactions,
        waiters;

        /* renamed from: values, reason: to resolve conflict with enum method */
        public static MemcachedStats[] valuesCustom() {
            MemcachedStats[] valuesCustom = values();
            int length = valuesCustom.length;
            MemcachedStats[] memcachedStatsArr = new MemcachedStats[length];
            System.arraycopy(valuesCustom, 0, memcachedStatsArr, 0, length);
            return memcachedStatsArr;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.forceShutDown) {
            Map<SocketAddress, Map<String, String>> stats = this.client.getStats();
            Map<SocketAddress, Map<MemcachedStats, String>> extractQueueStats = extractQueueStats(this.inputQueue, stats);
            Map<SocketAddress, Map<MemcachedStats, String>> extractQueueStats2 = extractQueueStats(this.outputQueue, stats);
            List<AckStats> extractAckStats = extractAckStats(this.ackQueue, stats);
            boolean z = extractAckStats.size() == 0;
            AckStats mostRecentAckStats = mostRecentAckStats(extractAckStats);
            float f = 0.0f;
            float f2 = 0.0f;
            float f3 = 0.0f;
            Iterator<SocketAddress> it = stats.keySet().iterator();
            while (it.hasNext()) {
                Map<MemcachedStats, String> map = extractQueueStats.get(it.next());
                f += Integer.parseInt(map.get(MemcachedStats.items));
                f2 += Integer.parseInt(map.get(MemcachedStats.total_items));
                f3 += Integer.parseInt(extractQueueStats2.get(r0).get(MemcachedStats.total_items));
            }
            float f4 = f2 - f;
            float f5 = f4 / f2;
            if (f5 > 0.0f && this.timer == null) {
                this.timer = Timer.timer();
            } else if (f5 > 0.0f) {
                reportTime(f2, f3, f5, f4 / ((float) (this.timer.duration() / 1000)), mostRecentAckStats);
                if (f5 == 1.0d && z) {
                    break;
                }
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
        logger.info("Closing kestrel queue monitor");
        this.monitorWriter.flush();
        this.monitorWriter.close();
        this.client.shutdown();
    }

    private void reportTime(float f, float f2, float f3, float f4, AckStats ackStats) {
        HashMap hashMap = new HashMap();
        hashMap.put("inputTotal", new StringBuilder().append(f).toString());
        hashMap.put("outputGenerated", new StringBuilder().append(f2).toString());
        hashMap.put("progress", new StringBuilder().append(f3).toString());
        hashMap.put("spout_throughput", new StringBuilder().append(f4).toString());
        if (ackStats != null) {
            hashMap.put("ack_throughput", new StringBuilder().append(ackStats.throughput).toString());
        }
        String json = gson.toJson(hashMap);
        this.monitorWriter.println(json);
        this.monitorWriter.flush();
        logger.debug(json);
    }

    private List<AckStats> extractAckStats(String str, Map<SocketAddress, Map<String, String>> map) {
        ArrayList arrayList = new ArrayList();
        if (str == null) {
            return arrayList;
        }
        try {
            Iterator it = this.thriftClientIterator.next().get(str, 100, 100, 0).iterator();
            while (it.hasNext()) {
                arrayList.add((AckStats) gson.fromJson(new InputStreamReader(new ByteArrayInputStream(((Item) it.next()).get_data())), AckStats.class));
            }
        } catch (TException e) {
            logger.error("Failed to get ackknowledgement queue");
        }
        return arrayList;
    }

    private AckStats mostRecentAckStats(List<AckStats> list) {
        if (list.size() == 0) {
            return this.recentAckStats;
        }
        for (AckStats ackStats : list) {
            if (this.recentAckStats == null) {
                this.recentAckStats = ackStats;
            } else if (this.recentAckStats.timestamp < ackStats.timestamp) {
                this.recentAckStats = ackStats;
            }
        }
        return this.recentAckStats;
    }

    private Map<SocketAddress, Map<MemcachedStats, String>> extractQueueStats(String str, Map<SocketAddress, Map<String, String>> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<SocketAddress, Map<String, String>> entry : map.entrySet()) {
            HashMap hashMap2 = new HashMap();
            hashMap.put(entry.getKey(), hashMap2);
            Map<String, String> value = entry.getValue();
            for (MemcachedStats memcachedStats : MemcachedStats.valuesCustom()) {
                String str2 = value.get(String.format(this.queueFormatString, str, memcachedStats.toString()));
                if (str2 == null) {
                    hashMap2.put(memcachedStats, "0");
                } else {
                    hashMap2.put(memcachedStats, str2);
                }
            }
        }
        return hashMap;
    }

    @Override // org.openimaj.rdf.storm.tool.monitor.MonitorMode
    public void init(ReteStormOptions reteStormOptions, Config config) throws IOException {
        this.inputQueue = reteStormOptions.inputQueue;
        this.outputQueue = reteStormOptions.outputQueue;
        this.ackQueue = (String) config.get("topology.rete.kestrel.ack_queue");
        this.thriftClientIterator = KestrelServerSpec.thriftClientIterator(reteStormOptions.kestrelSpecList);
        this.client = new MemcachedClient(AddrUtil.getAddresses(KestrelServerSpec.kestrelAddressListAsString(reteStormOptions.kestrelSpecList, 22133)));
        this.monitorWriter = new PrintWriter(new FileOutputStream(this.monitorOutput));
    }

    @Override // org.openimaj.rdf.storm.tool.monitor.MonitorMode
    public void close() {
        this.forceShutDown = true;
    }
}
