package org.openimaj.rdf.storm.tool;

import backtype.storm.Config;
import backtype.storm.generated.StormTopology;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.thrift7.TException;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.kohsuke.args4j.ProxyOptionHandler;
import org.openimaj.io.FileUtils;
import org.openimaj.kestrel.KestrelServerSpec;
import org.openimaj.kestrel.KestrelTupleWriter;
import org.openimaj.rdf.storm.sparql.topology.builder.datasets.StaticRDFDataset;
import org.openimaj.rdf.storm.tool.lang.RuleLanguageHandler;
import org.openimaj.rdf.storm.tool.lang.RuleLanguageMode;
import org.openimaj.rdf.storm.tool.monitor.MonitorMode;
import org.openimaj.rdf.storm.tool.monitor.MonitorModeOption;
import org.openimaj.rdf.storm.tool.source.TriplesInputMode;
import org.openimaj.rdf.storm.tool.source.TriplesInputModeOption;
import org.openimaj.rdf.storm.tool.staticdata.StaticDataMode;
import org.openimaj.rdf.storm.tool.staticdata.StaticDataModeOption;
import org.openimaj.rdf.storm.tool.topology.TopologyModeOption;
import org.openimaj.rdf.storm.utils.JenaStormUtils;
import org.openimaj.storm.tool.StormToolOptions;
import org.openimaj.storm.utils.KestrelUtils;

/* loaded from: input_file:org/openimaj/rdf/storm/tool/ReteStormOptions.class */
public class ReteStormOptions extends StormToolOptions {
    private static final Logger logger = Logger.getLogger(ReteStormOptions.class);

    @Option(name = "--topology-name", aliases = {"-tn"}, required = false, usage = "The name of the topology being submitted. If not provided defaults to <ruleLanguage>_topology_<launchTimeInMillis>", metaVar = "STRING")
    public String topologyName;

    @Option(name = "--rule-language", aliases = {"-rl"}, required = false, usage = "The language to decipher rules and construct a Rete network as a Storm Topology", handler = ProxyOptionHandler.class)
    public RuleLanguageMode ruleLanguageMode;
    public RuleLanguageHandler ruleLanguageModeOp;

    @Option(name = "--monitor-mode", aliases = {"-mm"}, required = false, usage = "A monitor is started and runs in a thread after the topology is instantiated", handler = ProxyOptionHandler.class)
    public MonitorModeOption mm;
    public MonitorMode mmOp;

    @Option(name = "--triples-input", aliases = {"-ti"}, required = false, usage = "The source of triples", handler = ProxyOptionHandler.class)
    public TriplesInputModeOption triplesInputMode;
    public TriplesInputMode triplesInputModeOp;
    private String rules;

    @Option(name = "--static-data", aliases = {"-sd"}, required = false, usage = "The source of any static data. The format this must take is name=uri at the moment.", metaVar = "STRING", multiValued = true)
    public List<String> staticDataSource;

    @Option(name = "--static-data-mode", aliases = {"-sdm"}, required = false, usage = "How static data is handed to the streaming system", handler = ProxyOptionHandler.class)
    public StaticDataModeOption sdm;
    public StaticDataMode sdmOp;

    @Option(name = "--force-feed-back", aliases = {"-ffb"}, required = false, usage = "When set forces any outputs to be streamed through again. Usually required for rule systems", multiValued = true)
    private boolean feedBack;

    @Option(name = "--kestrel-input-queue", aliases = {"-kiq"}, required = false, usage = "The input queue")
    public String inputQueue;

    @Option(name = "--kestrel-output-queue", aliases = {"-koq"}, required = false, usage = "The output queue")
    public String outputQueue;

    @Option(name = "--prepopulate-input", aliases = {"-prepi"}, required = false, usage = "Force all input values to be queued before the first value is fed to the topology")
    public boolean prepopulate;

    @Option(name = "--topology-parallelism", aliases = {"-tpar"}, required = false, usage = "The number of tasks ran by each bolt in the topology. This offers the default value for join/filter parallelism")
    public String topologyParallelism;

    @Option(name = "--topology-spout-parallelism", aliases = {"-spar"}, required = false, usage = "The number of tasks ran by each spout in the topology")
    public String topologySpoutParallelism;

    @Option(name = "--topology-join-parallelism", aliases = {"-jpar"}, required = false, usage = "The number of tasks ran by each join bolt in the topology")
    public String topologyJoinParallelism;

    @Option(name = "--topology-filter-parallelism", aliases = {"-fpar"}, required = false, usage = "The number of tasks ran by each filter bolt in the topology")
    public String topologyFilterParallelism;

    @Option(name = "--topology-max-parallelism", aliases = {"-maxpar"}, required = false, usage = "Max parallelism")
    private int maxParallelism;
    private Config preparedConfig;

    public ReteStormOptions(String[] strArr) {
        super(strArr);
        this.topologyName = null;
        this.ruleLanguageMode = RuleLanguageMode.JENA;
        this.ruleLanguageModeOp = null;
        this.mm = MonitorModeOption.NONE;
        this.mmOp = this.mm.mo6getOptions();
        this.triplesInputMode = TriplesInputModeOption.URI;
        this.triplesInputModeOp = this.triplesInputMode.mo8getOptions();
        this.staticDataSource = new ArrayList();
        this.sdm = StaticDataModeOption.IN_MEMORY;
        this.sdmOp = StaticDataModeOption.IN_MEMORY.mo11getOptions();
        this.feedBack = false;
        this.inputQueue = "inputQueue";
        this.outputQueue = "outputQueue";
        this.prepopulate = false;
        this.topologyParallelism = "2";
        this.topologySpoutParallelism = "1";
        this.topologyJoinParallelism = null;
        this.topologyFilterParallelism = null;
        this.maxParallelism = 4;
    }

    public String getExtractUsageInfo() {
        return "";
    }

    public void validate(CmdLineParser cmdLineParser) throws CmdLineException, IOException {
        if (this.topologyName == null) {
            this.topologyName = String.valueOf(this.ruleLanguageMode.toString()) + "_topology_" + System.currentTimeMillis();
        }
        if (getInput() == null) {
            throw new CmdLineException(cmdLineParser, "No input rules provided.");
        }
        File file = new File(getInput());
        if (!file.exists()) {
            throw new CmdLineException(cmdLineParser, "Input rules file does not exist!");
        }
        this.rules = FileUtils.readall(file);
        this.triplesInputModeOp.init(this);
        prepareConfig();
    }

    public StormTopology constructTopology() {
        return this.ruleLanguageModeOp.constructTopology(this);
    }

    public String getRules() {
        return this.rules;
    }

    public KestrelTupleWriter triplesKestrelWriter() throws IOException {
        return this.triplesInputModeOp.asKestrelWriter();
    }

    public Map<String, StaticRDFDataset> staticDataSources() {
        HashMap hashMap = new HashMap();
        Iterator<String> it = this.staticDataSource.iterator();
        while (it.hasNext()) {
            String[] split = it.next().split("=");
            hashMap.put(split[0], split[1]);
        }
        return this.sdmOp.datasets(hashMap);
    }

    public List<KestrelServerSpec> getKestrelSpecList() {
        return this.kestrelSpecList;
    }

    public void populateInputs() throws TException, IOException {
        logger.info("Populating kestrel Queues");
        KestrelTupleWriter triplesKestrelWriter = triplesKestrelWriter();
        if (this.feedBack) {
            triplesKestrelWriter.write(this.kestrelSpecList, new String[]{this.inputQueue, this.outputQueue});
        } else {
            triplesKestrelWriter.write(this.kestrelSpecList, new String[]{this.inputQueue});
        }
    }

    public void prepareQueues() throws TException {
        if (this.triplesInputMode == TriplesInputModeOption.NONE) {
            logger.info("Not touching queues");
            return;
        }
        logger.info("Preparing Kestrel Queues");
        Iterator it = this.kestrelSpecList.iterator();
        while (it.hasNext()) {
            KestrelUtils.deleteQueues((KestrelServerSpec) it.next(), new String[]{this.inputQueue, this.outputQueue});
        }
    }

    public Config prepareConfig() {
        if (this.preparedConfig == null) {
            this.preparedConfig = new Config();
            this.preparedConfig.setMaxSpoutPending(100);
            this.preparedConfig.put("topology.rete.parallelism", this.topologyParallelism);
            this.preparedConfig.put("topology.rete.join.parallelism", this.topologyJoinParallelism);
            this.preparedConfig.put("topology.rete.filter.parallelism", this.topologyFilterParallelism);
            this.preparedConfig.put("topology.rete.spout.parallelism", this.topologySpoutParallelism);
            this.ruleLanguageModeOp.initConfig(this.preparedConfig);
            this.preparedConfig.setNumWorkers(this.numberOfWorkers);
            this.preparedConfig.setMaxTaskParallelism(this.maxParallelism);
            this.preparedConfig.setFallBackOnJavaSerialization(false);
            this.preparedConfig.setSkipMissingKryoRegistrations(false);
            JenaStormUtils.registerSerializers(this.preparedConfig);
        }
        return this.preparedConfig;
    }

    public void initMonitor() throws IOException {
        if (this.mmOp != null) {
            logger.debug("Initialising monitor");
            this.mmOp.init(this, prepareConfig());
        }
    }

    public void startMonitor() {
        if (this.mmOp != null) {
            logger.debug("Starting monitor");
            Thread thread = new Thread(this.mmOp);
            thread.setDaemon(true);
            thread.start();
        }
    }

    public String topologyName() {
        return this.topologyName;
    }

    public void topologyCleanup() {
        if (this.tm == TopologyModeOption.LOCAL) {
            this.mmOp.close();
        }
    }
}
