Java – storm hdfs connector … Try using Storm to write data to HDFS

storm hdfs connector … Try using Storm to write data to HDFS… here is a solution to the problem.

storm hdfs connector … Try using Storm to write data to HDFS

I’m trying to write data to HDFS using “storm-hdfs connector 0.1.3”.
GitHub URL: https://github.com/ptgoetz/storm-hdfs I have added this dependency to my Maven project.

<dependency>
      <groupId>com.github.ptgoetz</groupId>
      <artifactId>storm-hdfs</artifactId>
      <version>0.1.3-SNAPSHOT</version>
      <scope>provided</scope>
</dependency>

A sample topology for writing data to HDFS is provided in the storm-hdfs project itself. I just modified it to match my file location. HdfsFileTopology is:

package my.company.app;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.AbstractHdfsBolt;
import org.apache.storm.hdfs.bolt.SequenceFileBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.hdfs.common.rotation.MoveFileAction;
import org.yaml.snakeyaml.Yaml;

import java.io.FileInputStream;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class HdfsFileTopology {
static final String SENTENCE_SPOUT_ID = "sentence-spout";
static final String BOLT_ID = "my-bolt";
static final String TOPOLOGY_NAME = "test-topology";

public static void main(String[] args) throws Exception {
    Config config = new Config();
    config.setNumWorkers(1);

SentenceSpout spout = new SentenceSpout();

 sync the filesystem after every 1k tuples
    SyncPolicy syncPolicy = new CountSyncPolicy(1000);

 rotate files when they reach 5MB
    FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES);

FileNameFormat fileNameFormat = new DefaultFileNameFormat()
            .withPath("/users/storm/")
            .withExtension(".txt");

 use "|" instead of "," for field delimiter
    RecordFormat format = new DelimitedRecordFormat()
            .withFieldDelimiter("|");

Yaml yaml = new Yaml();
    InputStream in = new FileInputStream(args[1]);
    Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
    in.close();
    config.put("hdfs.config", yamlConf);

HdfsBolt bolt = new HdfsBolt()
            .withConfigKey("hdfs.config")
            .withFsUrl(args[0])
            .withFileNameFormat(fileNameFormat)
            .withRecordFormat(format)
            .withRotationPolicy(rotationPolicy)
            .withSyncPolicy(syncPolicy)
            .addRotationAction(new MoveFileAction().toDestination("/dest2/"));

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(SENTENCE_SPOUT_ID, spout, 1);
     SentenceSpout --> MyBolt
    builder.setBolt(BOLT_ID, bolt, 4)
            .shuffleGrouping(SENTENCE_SPOUT_ID);

if (args.length == 2) {
        LocalCluster cluster = new LocalCluster();

cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
        waitForSeconds(120);
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();
        System.exit(0);
    } else if (args.length == 3) {
        StormSubmitter.submitTopology(args[0], config, builder.createTopology());
    } else{
        System.out.println("Usage: HdfsFileTopology [topology name] <yaml config file>");
    }
}

public static void waitForSeconds(int seconds) {
    try {
        Thread.sleep(seconds * 1000);
    } catch (InterruptedException e) {
    }
}

public static class SentenceSpout extends BaseRichSpout {
    private ConcurrentHashMap<UUID, Values> pending;
    private SpoutOutputCollector collector;
    private String[] sentences = {
            "my dog has fleas",
            "i like cold beverages",
            "the dog ate my homework",
            "don't have a cow man",
            "i don't think i like fleas"
    };
    private int index = 0;
    private int count = 0;
    private long total = 0L;

public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence", "timestamp"));
    }

public void open(Map config, TopologyContext context,
                     SpoutOutputCollector collector) {
        this.collector = collector;
        this.pending = new ConcurrentHashMap<UUID, Values>();
    }

public void nextTuple() {
        Values values = new Values(sentences[index], System.currentTimeMillis());
        UUID msgId = UUID.randomUUID();
        this.pending.put(msgId, values);
        this.collector.emit(values, msgId);
        index++;
        if (index >= sentences.length) {
            index = 0;
        }
        count++;
        total++;
        if(count > 20000){
            count = 0;
            System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
        }
        Thread.yield();
    }

public void ack(Object msgId) {
        this.pending.remove(msgId);
    }

public void fail(Object msgId) {
        System.out.println("**** RESENDING FAILED TUPLE");
        this.collector.emit(this.pending.get(msgId), msgId);
    }
}

public static class MyBolt extends BaseRichBolt {

private HashMap<String, Long> counts = null;
    private OutputCollector collector;

public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.counts = new HashMap<String, Long>();
        this.collector = collector;
    }

public void execute(Tuple tuple) {
        collector.ack(tuple);
    }

public void declareOutputFields(OutputFieldsDeclarer declarer) {
         this bolt does not emit anything
    }

@Override
    public void cleanup() {
    }
}
}

I

compiled the project with Maven (group ID: my.company.app) and the build succeeded, but when I commit the jar file to storm it throws an error.

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/storm/hdfs/bolt/format/FileNameFormat
Caused by: java.lang.ClassNotFoundException: org.apache.storm.hdfs.bolt.format.FileNameFormat

Even though I included this class, why does it throw an error that the class cannot be found?
Any help on how to use Storm to write data to HDFS would be appreciated.

Pom .xml based on the request

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

<groupId>com.mycompany.app</groupId>
  <artifactId>hdfs_example</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

<name>hdfs_example</name>
  <url>http://maven.apache.org</url>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

<repositories>
        <repository>
          <id>Codehaus</id>
          <url>http://repository.codehaus.org</url>
        </repository>

<repository>
          <id>Codehaus.Snapshots</id>
          <url>http://snapshots.repository.codehaus.org</url>
          <snapshots><enabled>true</enabled></snapshots>
        </repository>

<repository>
            <id>github-releases</id>
            <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
        </repository>

<repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
    </repositories>

<dependencies>
    <dependency>
      <groupId>org.testng</groupId>
      <artifactId>testng</artifactId>
      <version>6.8.5</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-all</artifactId>
      <version>1.9.0</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.easytesting</groupId>
      <artifactId>fest-assert-core</artifactId>
      <version>2.0M8</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.jmock</groupId>
      <artifactId>jmock</artifactId>
      <version>2.6.0</version>
      <scope>test</scope>
    </dependency>
    <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>0.9.2-incubating</version>
        <scope>provided</scope> 
</dependency>

<dependency>
      <groupId>commons-collections</groupId>
      <artifactId>commons-collections</artifactId>
      <version>3.2.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>15.0</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
    </dependency>
    <dependency>
      <groupId>com.github.ptgoetz</groupId>
      <artifactId>storm-hdfs</artifactId>
      <version>0.1.3-SNAPSHOT</version>
          <scope>provided</scope>
        </dependency>

</dependencies>

<build>
    <sourceDirectory>src/main/java</sourceDirectory>
    <testSourceDirectory>test/main/java</testSourceDirectory>
    <resources>
      <resource>
        <directory>${basedir}/multilang</directory>
      </resource>
    </resources>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>1.4</version>
    <configuration>
        <createDependencyReducedPom>true</createDependencyReducedPom>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass></mainClass>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
    </plugin>
    <!--  
    <plugin>
        <groupId>com.theoryinpractise</groupId>
        <artifactId>clojure-maven-plugin</artifactId>
        <version>1.3.12</version>
        <extensions>true</extensions>
        <configuration>
          <sourceDirectories>
            <sourceDirectory>src/clj</sourceDirectory>
          </sourceDirectories>
        </configuration>
        <executions>
          <execution>
            <id>compile</id>
            <phase>compile</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
          <execution>
            <id>test</id>
            <phase>test</phase>
            <goals>
              <goal>test</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
   -->
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.2.1</version>
        <executions>
          <execution>
            <goals>
              <goal>exec</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <executable>java</executable>
          <includeProjectDependencies>true</includeProjectDependencies>
          <includePluginDependencies>false</includePluginDependencies>
          <classpathScope>compile</classpathScope>
          <mainClass>${storm.topology}</mainClass>
        </configuration>
      </plugin>

<plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
        </configuration>
      </plugin>

</plugins>
  </build>  
</project>

Solution

I have

the same issue, after examining pom.xml in detail, I realized that relying on storm-hdfs <version>0.1.3-SNAPSHOT</version> defines the scope as contained, I think this means we have to add jars to storm jars, and Maven doesn’t do that during packaging.

I changed the version to the one available in the maven repository and removed the scope that forced maven to download the jar and include it in the storm jar during the build.

Here is my pom.xml for citation (removed some basic details):

Source/Primary/Resource/
Wrong

Core site .xml
hdfs-site.xml

<plugins>
  <plugin>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.2</version>
    <configuration>
      <source>1.7</source>
      <target>1.7</target>
    </configuration>
  </plugin>

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>1.4</version>
    <configuration>
      <createDependencyReducedPom>true</createDependencyReducedPom>
    </configuration>
    <executions>
      <execution>
        <phase>package</phase>
        <goals>
          <goal>shade</goal>
        </goals>
        <configuration>
          <transformers>
            <transformer
                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
            <transformer
                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.company.main</mainClass>
            </transformer>
          </transformers>
        </configuration>
      </execution>
    </executions>
  </plugin>

</plugins>

<dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>3.8.1</version>
  <scope>test</scope>
</dependency>

<dependency>
  <groupId>org.apache.storm</groupId>
  <artifactId>storm-core</artifactId>
  <version>0.9.2-incubating</version>
  <!-- keep storm out of the jar-with-dependencies -->
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.storm</groupId>
  <artifactId>storm-kafka</artifactId>
  <version>0.9.2-incubating</version>
</dependency>

<dependency>
  <groupId>log4j</groupId>
  <artifactId>log4j</artifactId>
  <version>1.2.17</version>
</dependency>

<!-- Utilities -->
<dependency>
  <groupId>commons-collections</groupId>
  <artifactId>commons-collections</artifactId>
  <version>3.2.1</version>
</dependency>

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>15.0</version>
</dependency>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.10</artifactId>
  <version>0.8.1.1</version>
  <exclusions>
    <exclusion>
      <groupId>javax.jms</groupId>
      <artifactId>jms</artifactId>
    </exclusion>
    <exclusion>
      <groupId>com.sun.jdmk</groupId>
      <artifactId>jmxtools</artifactId>
    </exclusion>
    <exclusion>
      <groupId>com.sun.jmx</groupId>
      <artifactId>jmxri</artifactId>
    </exclusion>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
    </exclusion>
    <exclusion>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
    </exclusion>
    <exclusion>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
    </exclusion>
    <exclusion>
      <groupId>com.101tec</groupId>
      <artifactId>zkclient</artifactId>
    </exclusion>
  </exclusions>
</dependency>

<!-- our cluster hadoop version -->
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.4.0</version>
  <exclusions>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
  </exclusions>
</dependency>

<!-- our cluster hadoop version -->
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-hdfs</artifactId>
  <version>2.4.0</version>
  <exclusions>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
  </exclusions>
</dependency>

<!-- apache hdfs-bolt related dependencies -->
<dependency>
  <groupId>com.github.ptgoetz</groupId>
  <artifactId>storm-hdfs</artifactId>
  <version>0.1.2</version>
  <exclusions>
    <exclusion>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
    </exclusion>
    <exclusion>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
    </exclusion>
  </exclusions>
</dependency>

Related Problems and Solutions