package org.openimaj.rdf.storm.topology;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import com.hp.hpl.jena.reasoner.rulesys.Rule;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.thrift7.TException;
import org.openimaj.kestrel.KestrelServerSpec;
import org.openimaj.rdf.storm.topology.bolt.ReteConflictSetBolt;
import org.openimaj.rdf.storm.topology.builder.ConfigurableRuleReteTopologyBuilder;
import org.openimaj.rdf.storm.topology.builder.KestrelReteTopologyBuilder;
import org.openimaj.rdf.storm.topology.builder.NTriplesReteTopologyBuilder;
import org.openimaj.rdf.storm.topology.builder.ReteTopologyBuilder;
import org.openimaj.rdf.storm.utils.JenaStormUtils;

/* loaded from: input_file:org/openimaj/rdf/storm/topology/RuleReteStormTopologyFactory.class */
public class RuleReteStormTopologyFactory {
    public static final String DEBUG_BOLT = "debugBolt";
    public static final String RDFS_RULES = "/org/openimaj/rdf/rules/rdfs-fb-tgc-noresource.rules";
    public static final String FINAL_TERMINAL = "final_term";
    private static Logger logger = Logger.getLogger(RuleReteStormTopologyFactory.class);
    private InputStream rulesStream;
    private String rulesString;

    public RuleReteStormTopologyFactory(Config config) {
        JenaStormUtils.registerSerializers(config);
        this.rulesStream = RuleReteStormTopologyFactory.class.getResourceAsStream("/org/openimaj/rdf/rules/rdfs-fb-tgc-noresource.rules");
    }

    public RuleReteStormTopologyFactory(Config config, InputStream inputStream) {
        JenaStormUtils.registerSerializers(config);
        this.rulesStream = inputStream;
    }

    public RuleReteStormTopologyFactory(Config config, String str) {
        JenaStormUtils.registerSerializers(config);
        this.rulesStream = null;
        this.rulesString = str;
    }

    public StormTopology buildTopology(String str) throws IOException {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        new NTriplesReteTopologyBuilder(str).compile(topologyBuilder, loadRules());
        return topologyBuilder.createTopology();
    }

    public StormTopology buildTopology(KestrelServerSpec kestrelServerSpec, String str, String str2) throws IOException {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        new KestrelReteTopologyBuilder(kestrelServerSpec, str, str2).compile(topologyBuilder, loadRules());
        return topologyBuilder.createTopology();
    }

    public StormTopology buildTopology(IRichSpout iRichSpout, ReteConflictSetBolt reteConflictSetBolt) throws IOException {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        new ConfigurableRuleReteTopologyBuilder(iRichSpout, reteConflictSetBolt).compile(topologyBuilder, loadRules());
        return topologyBuilder.createTopology();
    }

    public StormTopology buildTopology(ReteTopologyBuilder reteTopologyBuilder) throws IOException {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        reteTopologyBuilder.compile(topologyBuilder, loadRules());
        return topologyBuilder.createTopology();
    }

    public static StormTopology buildTopology(Config config, ReteTopologyBuilder reteTopologyBuilder, InputStream inputStream) throws IOException {
        return new RuleReteStormTopologyFactory(config, inputStream).buildTopology(reteTopologyBuilder);
    }

    private List<Rule> loadRules() throws IOException {
        List<Rule> parseRules;
        if (this.rulesStream != null) {
            parseRules = Rule.parseRules(Rule.rulesParserFromReader(new BufferedReader(new InputStreamReader(this.rulesStream))));
        } else {
            if (this.rulesString == null) {
                throw new IOException("Couldn't load rules!");
            }
            parseRules = Rule.parseRules(Rule.rulesParserFromReader(new BufferedReader(new StringReader(this.rulesString))));
        }
        return parseRules;
    }

    public static void main(String[] strArr) throws AlreadyAliveException, InvalidTopologyException, TException, IOException {
        Config config = new Config();
        config.setDebug(false);
        config.setNumWorkers(2);
        config.setMaxSpoutPending(1);
        config.setFallBackOnJavaSerialization(false);
        config.setSkipMissingKryoRegistrations(false);
        RuleReteStormTopologyFactory ruleReteStormTopologyFactory = new RuleReteStormTopologyFactory(config, new FileInputStream("/Users/ss/Development/java/openimaj/trunk/storm/ReteStorm/src/test/resources/test.rules"));
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("reteTopology", config, ruleReteStormTopologyFactory.buildTopology("file:///Users/ss/Development/java/openimaj/trunk/storm/ReteStorm/src/test/resources/test.rdfs"));
        Utils.sleep(10000L);
        localCluster.killTopology("reteTopology");
        localCluster.shutdown();
    }
}
