package org.openimaj.rdf.storm.sparql.topology;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import eu.larkc.csparql.streams.formats.TranslationException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.thrift7.TException;
import org.openimaj.io.FileUtils;
import org.openimaj.rdf.storm.eddying.topology.builder.ExampleEddySteMTopologyBuilder;
import org.openimaj.rdf.storm.sparql.topology.builder.SPARQLReteTopologyBuilder;
import org.openimaj.rdf.storm.sparql.topology.builder.group.StaticDataFileNTriplesSPARQLReteTopologyBuilder;
import org.openimaj.rdf.storm.topology.builder.ReteTopologyBuilder;
import org.openimaj.rdf.storm.utils.CsparqlUtils;
import org.openimaj.rdf.storm.utils.JenaStormUtils;

/* loaded from: input_file:org/openimaj/rdf/storm/sparql/topology/StormSPARQLReteTopologyOrchestrator.class */
public class StormSPARQLReteTopologyOrchestrator {
    public static final String DEBUG_BOLT = "debugBolt";
    public static final String SELECT_ALL = "REGISTER QUERY ALLQ AS SELECT ?a ?b ?c FROM STREAM <%s> [RANGE 1m STEP 1m] WHERE {?a ?b ?c.}";
    public static final String FINAL_TERMINAL = "final_term";
    private static Logger logger = Logger.getLogger(StormSPARQLReteTopologyOrchestrator.class);
    private CsparqlUtils.CSparqlComponentHolder query;
    private Config conf;
    private SPARQLReteTopologyBuilder builder;

    public StormSPARQLReteTopologyOrchestrator(Config config) throws TranslationException, IOException {
        JenaStormUtils.registerSerializers(config);
        this.query = CsparqlUtils.parse(SELECT_ALL);
    }

    public StormSPARQLReteTopologyOrchestrator(Config config, String str) throws TranslationException, IOException {
        JenaStormUtils.registerSerializers(config);
        this.conf = config;
        this.query = CsparqlUtils.parse(str);
    }

    public StormSPARQLReteTopologyOrchestrator(Config config, String str, SPARQLReteTopologyBuilder sPARQLReteTopologyBuilder) throws TranslationException, IOException {
        JenaStormUtils.registerSerializers(config);
        this.conf = config;
        this.query = CsparqlUtils.parse(str);
        this.builder = sPARQLReteTopologyBuilder;
    }

    public StormTopology buildTopology() {
        this.builder.setConfig(this.conf);
        return this.builder != null ? buildTopology(this.builder) : buildTopology(new StaticDataFileNTriplesSPARQLReteTopologyBuilder(new String[0]));
    }

    public StormTopology buildTopology(SPARQLReteTopologyBuilder sPARQLReteTopologyBuilder) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        sPARQLReteTopologyBuilder.compile(topologyBuilder, this.query);
        return topologyBuilder.createTopology();
    }

    public static StormSPARQLReteTopologyOrchestrator createTopologyBuilder(Config config, SPARQLReteTopologyBuilder sPARQLReteTopologyBuilder, String str) throws TranslationException, IOException {
        return new StormSPARQLReteTopologyOrchestrator(config, str, sPARQLReteTopologyBuilder);
    }

    public static StormSPARQLReteTopologyOrchestrator createTopologyBuilder(SPARQLReteTopologyBuilder sPARQLReteTopologyBuilder, String str) throws TranslationException, IOException {
        Config config = new Config();
        config.setDebug(false);
        config.setNumWorkers(1);
        config.setMaxSpoutPending(ExampleEddySteMTopologyBuilder.STEMSIZE);
        config.setMaxTaskParallelism(16);
        config.put(SPARQLReteTopologyBuilder.RETE_TOPOLOGY_PARALLELISM, "2");
        config.setFallBackOnJavaSerialization(false);
        config.setSkipMissingKryoRegistrations(false);
        return new StormSPARQLReteTopologyOrchestrator(config, str, sPARQLReteTopologyBuilder);
    }

    public static StormSPARQLReteTopologyOrchestrator createTopologyBuilder(SPARQLReteTopologyBuilder sPARQLReteTopologyBuilder, InputStream inputStream) throws TranslationException, IOException {
        return createTopologyBuilder(sPARQLReteTopologyBuilder, FileUtils.readall(inputStream));
    }

    public static StormSPARQLReteTopologyOrchestrator createTopologyBuilder(SPARQLReteTopologyBuilder sPARQLReteTopologyBuilder, InputStream inputStream, Map<String, String> map) throws TranslationException, IOException {
        return createTopologyBuilder(sPARQLReteTopologyBuilder, FileUtils.readall(inputStream), map);
    }

    public static StormSPARQLReteTopologyOrchestrator createTopologyBuilder(SPARQLReteTopologyBuilder sPARQLReteTopologyBuilder, String str, Map<String, String> map) throws TranslationException, IOException {
        return createTopologyBuilder(sPARQLReteTopologyBuilder, CsparqlUtils.templaceQuery(str, map));
    }

    private static StormSPARQLReteTopologyOrchestrator createTopologyBuilder(InputStream inputStream) throws IOException, TranslationException {
        return createTopologyBuilder(FileUtils.readall(inputStream));
    }

    public static StormSPARQLReteTopologyOrchestrator createTopologyBuilder(String str) throws TranslationException, IOException {
        Config config = new Config();
        config.setDebug(false);
        config.setNumWorkers(2);
        config.setMaxSpoutPending(1);
        config.setFallBackOnJavaSerialization(false);
        config.setSkipMissingKryoRegistrations(false);
        return new StormSPARQLReteTopologyOrchestrator(config, str);
    }

    public CsparqlUtils.CSparqlComponentHolder getQuery() {
        return this.query;
    }

    public static void main(String[] strArr) throws AlreadyAliveException, InvalidTopologyException, TException, IOException, TranslationException {
        StormSPARQLReteTopologyOrchestrator createTopologyBuilder = createTopologyBuilder(ReteTopologyBuilder.class.getResourceAsStream("/test.csparql"));
        LocalCluster localCluster = new LocalCluster();
        StormTopology buildTopology = createTopologyBuilder.buildTopology();
        System.out.println(buildTopology);
        localCluster.submitTopology("reteTopology", createTopologyBuilder.conf, buildTopology);
        Utils.sleep(100000L);
        localCluster.killTopology("reteTopology");
        localCluster.shutdown();
    }

    public Config getConfiguration() {
        return this.conf;
    }
}
