详解Java编写并运行spark应用程序的方法

我们首先提出这样一个简单的需求:

现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示:

121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
121.205.198.92 - - [21/Feb/2014:00:00:11 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
121.205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html/ HTTP/1.1" 301 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
121.205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
121.205.241.229 - - [21/Feb/2014:00:00:13 +0800] "GET /archives/526.html HTTP/1.1" 200 12080 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
121.205.241.229 - - [21/Feb/2014:00:00:15 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"

Java实现Spark应用程序(Application)

我们实现的统计分析程序,有如下几个功能点:

从HDFS读取日志数据文件

将每行的第一个字段(IP地址)抽取出来

统计每个IP地址出现的次数

根据每个IP地址出现的次数进行一个降序排序

根据IP地址,调用GeoIP库获取IP所属国家

打印输出结果,每行的格式:[国家代码] IP地址 频率

下面,看我们使用Java实现的统计分析应用程序代码,如下所示:

package org.shirdrn.spark.job;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.shirdrn.spark.job.maxmind.Country;
import org.shirdrn.spark.job.maxmind.LookupService;
import scala.Serializable;
import scala.Tuple2;
public class IPAddressStats implements Serializable {
  private static final long serialVersionUID = 8533489548835413763L;
  private static final Log LOG = LogFactory.getLog(IPAddressStats.class);
  private static final Pattern SPACE = Pattern.compile(" ");
  private transient LookupService lookupService;
  private transient final String geoIPFile;
  public IPAddressStats(String geoIPFile) {
   this.geoIPFile = geoIPFile;
   try {
    // lookupService: get country code from a IP address
    File file = new File(this.geoIPFile);
    LOG.info("GeoIP file: " + file.getAbsolutePath());
    lookupService = new AdvancedLookupService(file, LookupService.GEOIP_MEMORY_CACHE);
   } catch (IOException e) {
    throw new RuntimeException(e);
   }
  }
  @SuppressWarnings("serial")
  public void stat(String[] args) {
   JavaSparkContext ctx = new JavaSparkContext(args[0], "IPAddressStats",
     System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(IPAddressStats.class));
   JavaRDD<String> lines = ctx.textFile(args[1], 1);
   // splits and extracts ip address filed
   JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterable<String> call(String s) {
     // 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
     // ip address
     return Arrays.asList(SPACE.split(s)[0]);
    }
   });
   // map
   JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(String s) {
     return new Tuple2<String, Integer>(s, 1);
    }
   });
   // reduce
   JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer i1, Integer i2) {
     return i1 + i2;
    }
   });
   List<Tuple2<String, Integer>> output = counts.collect();
   // sort statistics result by value
   Collections.sort(output, new Comparator<Tuple2<String, Integer>>() {
    @Override
    public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
     if(t1._2 < t2._2) {
       return 1;
     } else if(t1._2 > t2._2) {
       return -1;
     }
     return 0;
    }
   });
   writeTo(args, output);
  }
  private void writeTo(String[] args, List<Tuple2<String, Integer>> output) {
   for (Tuple2<?, ?> tuple : output) {
    Country country = lookupService.getCountry((String) tuple._1);
    LOG.info("[" + country.getCode() + "] " + tuple._1 + "\t" + tuple._2);
   }
  }
  public static void main(String[] args) {
   // ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
   if (args.length < 3) {
    System.err.println("Usage: IPAddressStats <master> <inFile> <GeoIPFile>");
    System.err.println(" Example: org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat");
    System.exit(1);
   }
   String geoIPFile = args[2];
   IPAddressStats stats = new IPAddressStats(geoIPFile);
   stats.stat(args);
   System.exit(0);
  }
}

具体实现逻辑,可以参考代码中的注释。我们使用Maven管理构建Java程序,首先看一下我的pom配置中所依赖的软件包,如下所示:

<dependencies>
   <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>0.9.0-incubating</version>
   </dependency>
   <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.16</version>
   </dependency>
   <dependency>
    <groupId>dnsjava</groupId>
    <artifactId>dnsjava</artifactId>
    <version>2.1.1</version>
   </dependency>
   <dependency>
    <groupId>commons-net</groupId>
    <artifactId>commons-net</artifactId>
    <version>3.1</version>
   </dependency>
   <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>1.2.1</version>
   </dependency>
  </dependencies>

需要说明的是,当我们将程序在Spark集群上运行时,它要求我们的编写的Job能够进行序列化,如果某些字段不需要序列化或者无法序列化,可以直接使用transient修饰即可,如上面的属性lookupService没有实现序列化接口,使用transient使其不执行序列化,否则的话,可能会出现类似如下的错误:

14/03/10 22:34:06 INFO scheduler.DAGScheduler: Failed to run collect at IPAddressStats.java:76
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.shirdrn.spark.job.IPAddressStats
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:741)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:740)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:740)
  at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

在Spark集群上运行Java程序

这里,我使用了Maven管理构建Java程序,实现上述代码以后,使用Maven的maven-assembly-plugin插件,配置内容如下所示:

<plugin>
  <artifactId>maven-assembly-plugin</artifactId>
  <configuration>
   <archive>
    <manifest>
     <mainClass>org.shirdrn.spark.job.UserAgentStats</mainClass>
    </manifest>
   </archive>
   <descriptorRefs>
    <descriptorRef>jar-with-dependencies</descriptorRef>
   </descriptorRefs>
   <excludes>
    <exclude>*.properties</exclude>
    <exclude>*.xml</exclude>
   </excludes>
  </configuration>
  <executions>
   <execution>
    <id>make-assembly</id>
    <phase>package</phase>
    <goals>
     <goal>single</goal>
    </goals>
   </execution>
  </executions>
</plugin>

将相关依赖库文件都打进程序包里面,最后拷贝JAR文件到Linux系统下(不一定非要在Spark集群的Master节点上),保证该节点上Spark的环境变量配置正确即可看。Spark软件发行包解压缩后,可以看到脚本bin/run-example,我们可以直接修改该脚本,将对应的路径指向我们实现的Java程序包(修改变量EXAMPLES_DIR以及我们的JAR文件存放位置相关的内容),使用该脚本就可以运行,脚本内容如下所示:

cygwin=false
case "`uname`" in
 CYGWIN*) cygwin=true;;
esac
SCALA_VERSION=2.10
# Figure out where the Scala framework is installed
FWDIR="$(cd `dirname $0`/..; pwd)"
# Export this as SPARK_HOME
export SPARK_HOME="$FWDIR"
# Load environment variables from conf/spark-env.sh, if it exists
if [ -e "$FWDIR/conf/spark-env.sh" ] ; then
 . $FWDIR/conf/spark-env.sh
fi
if [ -z "$1" ]; then
 echo "Usage: run-example <example-class> [<args>]" >&2
 exit 1
fi
# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
# to avoid the -sources and -doc packages that are built by publish-local.
EXAMPLES_DIR="$FWDIR"/java-examples
SPARK_EXAMPLES_JAR=""
if [ -e "$EXAMPLES_DIR"/*.jar ]; then
 export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/*.jar`
fi
if [[ -z $SPARK_EXAMPLES_JAR ]]; then
 echo "Failed to find Spark examples assembly in $FWDIR/examples/target" >&2
 echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
 exit 1
fi
# Since the examples JAR ideally shouldn't include spark-core (that dependency should be
# "provided"), also add our standard Spark classpath, built using compute-classpath.sh.
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH"
if $cygwin; then
 CLASSPATH=`cygpath -wp $CLASSPATH`
 export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR`
fi
# Find java binary
if [ -n "${JAVA_HOME}" ]; then
 RUNNER="${JAVA_HOME}/bin/java"
else
 if [ `command -v java` ]; then
 RUNNER="java"
 else
 echo "JAVA_HOME is not set" >&2
 exit 1
 fi
fi
# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$SPARK_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e "$FWDIR/conf/java-opts" ] ; then
 JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
fi
export JAVA_OPTS
if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
 echo -n "Spark Command: "
 echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
 echo "========================================"
 echo
fi
exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"

在Spark上运行我们开发的Java程序,执行如下命令:

cd /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1
./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat

我实现的程序类org.shirdrn.spark.job.IPAddressStats运行需要3个参数:

Spark集群主节点URL:例如我的是spark://m1:7077

输入文件路径:业务相关的,我这里是从HDFS上读取文件hdfs://m1:9000/user/shirdrn/wwwlog20140222.log

GeoIP库文件:业务相关的,用来计算IP地址所属国家的外部文件

如果程序没有错误,能够正常运行,控制台输出程序运行日志,示例如下所示:

14/03/10 22:17:24 INFO job.IPAddressStats: GeoIP file: /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/03/10 22:17:25 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/03/10 22:17:25 INFO Remoting: Starting remoting
14/03/10 22:17:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@m1:57379]
14/03/10 22:17:25 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@m1:57379]
14/03/10 22:17:25 INFO spark.SparkEnv: Registering BlockManagerMaster
14/03/10 22:17:25 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140310221725-c1cb
14/03/10 22:17:25 INFO storage.MemoryStore: MemoryStore started with capacity 143.8 MB.
14/03/10 22:17:25 INFO network.ConnectionManager: Bound socket to port 45189 with id = ConnectionManagerId(m1,45189)
14/03/10 22:17:25 INFO storage.BlockManagerMaster: Trying to register BlockManager
14/03/10 22:17:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager m1:45189 with 143.8 MB RAM
14/03/10 22:17:25 INFO storage.BlockManagerMaster: Registered BlockManager
14/03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server
14/03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:49186
14/03/10 22:17:25 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.95.3.56:49186
14/03/10 22:17:25 INFO spark.SparkEnv: Registering MapOutputTracker
14/03/10 22:17:25 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-56c3e30d-a01b-4752-83d1-af1609ab2370
14/03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server
14/03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:52073
14/03/10 22:17:26 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/03/10 22:17:26 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
14/03/10 22:17:26 INFO ui.SparkUI: Started Spark Web UI at http://m1:4040
14/03/10 22:17:26 INFO spark.SparkContext: Added JAR /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://10.95.3.56:52073/jars/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1394515046396
14/03/10 22:17:26 INFO client.AppClient$ClientActor: Connecting to master spark://m1:7077...
14/03/10 22:17:26 INFO storage.MemoryStore: ensureFreeSpace(60341) called with curMem=0, maxMem=150837657
14/03/10 22:17:26 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 58.9 KB, free 143.8 MB)
14/03/10 22:17:26 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140310221726-0000
14/03/10 22:17:27 INFO client.AppClient$ClientActor: Executor added: app-20140310221726-0000/0 on worker-20140310221648-s1-52544 (s1:52544) with 1 cores
14/03/10 22:17:27 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140310221726-0000/0 on hostPort s1:52544 with 1 cores, 512.0 MB RAM
14/03/10 22:17:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/03/10 22:17:27 WARN snappy.LoadSnappy: Snappy native library not loaded
14/03/10 22:17:27 INFO client.AppClient$ClientActor: Executor updated: app-20140310221726-0000/0 is now RUNNING
14/03/10 22:17:27 INFO mapred.FileInputFormat: Total input paths to process : 1
14/03/10 22:17:27 INFO spark.SparkContext: Starting job: collect at IPAddressStats.java:77
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at IPAddressStats.java:70)
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Got job 0 (collect at IPAddressStats.java:77) with 1 output partitions (allowLocal=false)
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Final stage: Stage 0 (collect at IPAddressStats.java:77)
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1)
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70), which has no missing parents
14/03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70)
14/03/10 22:17:27 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
14/03/10 22:17:28 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@s1:59233/user/Executor#-671170811] with ID 0
14/03/10 22:17:28 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: s1 (PROCESS_LOCAL)
14/03/10 22:17:28 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2396 bytes in 5 ms
14/03/10 22:17:29 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager s1:47282 with 297.0 MB RAM
14/03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 0 in 3376 ms on s1 (progress: 0/1)
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 0)
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 1 (reduceByKey at IPAddressStats.java:70) finished in 4.420 s
14/03/10 22:17:32 INFO scheduler.DAGScheduler: looking for newly runnable stages
14/03/10 22:17:32 INFO scheduler.DAGScheduler: running: Set()
14/03/10 22:17:32 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)
14/03/10 22:17:32 INFO scheduler.DAGScheduler: failed: Set()
14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 1.0 from pool
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List()
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70), which is now runnable
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70)
14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/03/10 22:17:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 1 on executor 0: s1 (PROCESS_LOCAL)
14/03/10 22:17:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 2255 bytes in 1 ms
14/03/10 22:17:32 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@s1:33534
14/03/10 22:17:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 120 bytes
14/03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 1 in 282 ms on s1 (progress: 0/1)
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
14/03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 0 (collect at IPAddressStats.java:77) finished in 0.314 s
14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool
14/03/10 22:17:32 INFO spark.SparkContext: Job finished: collect at IPAddressStats.java:77, took 4.870958309 s
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 58.246.49.218  312
14/03/10 22:17:32 INFO job.IPAddressStats: [KR] 1.234.83.77  300
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.16  212
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 110.85.72.254  207
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 27.150.229.134  185
14/03/10 22:17:32 INFO job.IPAddressStats: [HK] 180.178.52.181  181
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.37.210.212  180
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 222.77.226.83  176
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.205  169
14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.9.19  165
...

我们也可以通过Web控制台来查看当前执行应用程序(Application)的状态信息,通过Master节点的8080端口(如:http://m1:8080/)就能看到集群的应用程序(Application)状态信息。

另外,需要说明的时候,如果在Unix环境下使用Eclipse使用Java开发Spark应用程序,也能够直接通过Eclipse连接Spark集群,并提交开发的应用程序,然后交给集群去处理。

总结

以上就是本文关于详解Java编写并运行spark应用程序的方法的全部内容,希望对大家有所帮助。有什么问题可以随时留言,小编会及时回复大家。

(0)

相关推荐

  • Spark三种属性配置方式详解

    随着Spark项目的逐渐成熟, 越来越多的可配置参数被添加到Spark中来.在Spark中提供了三个地方用于配置: 1.Spark properties:这个可以控制应用程序的绝大部分属性.并且可以通过 SparkConf对象或者Java 系统属性进行设置: 2.环境变量(Environment variables):这个可以分别对每台机器进行相应的设置,比如IP.这个可以在每台机器的$SPARK_HOME/ conf/spark-env.sh脚本中进行设置: 3.日志:所有的日志相关的属性可以

  • Python中用Spark模块的使用教程

    在日常的编程中,我经常需要标识存在于文本文档中的部件和结构,这些文档包括:日志文件.配置文件.定界的数据以及格式更自由的(但还是半结构化的)报表格式.所有这些文档都拥有它们自己的"小语言",用于规定什么能够出现在文档内.我编写这些非正式解析任务的程序的方法总是有点象大杂烩,其中包括定制状态机.正则表达式以及上下文驱动的字符串测试.这些程序中的模式大概总是这样:"读一些文本,弄清是否可以用它来做些什么,然后可能再多读一些文本,一直尝试下去." 解析器将文档中部件和结构

  • 微软推DreamSpark计划为学生提供免费软件下载地址

    微软推DreamSpark计划为学生提供免费软件 微软公司董事长比尔·盖茨宣布将为全球数百万大学和中学生提供免费的开发和设计工具,以发掘学生的创造潜力,帮助他们踏上学术和职业成功之路.  据国外媒体报道,微软推出的DreamSpark学生计划提供了众多开发和设计软件供学生免费下载,该计划现已向比利时.中国.芬兰.法国.德国.西班牙.瑞典.瑞士.英国和美国的3500万大学生推出.未来6个月内,微软预计将把DreamSpark计划拓展到涵盖澳大利亚.捷克共和国.爱沙尼亚.日本.立陶宛.拉脱维亚.斯洛

  • java 中Spark中将对象序列化存储到hdfs

    java 中Spark中将对象序列化存储到hdfs 摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs. 废话不多说, 直接贴代码了. spark1.4 + hbase0.98 import org.apache.spark.storage.StorageLevel imp

  • Spark SQL数据加载和保存实例讲解

    一.前置知识详解 Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, Load:可以创建DataFrame, Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型. 二.Spark SQL读写数据代码实战 import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD;

  • 详解Java编写并运行spark应用程序的方法

    我们首先提出这样一个简单的需求: 现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况.这里我拿我网站的日志记录行示例,如下所示: 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" &qu

  • 详解Java泛型中类型擦除问题的解决方法

    以前就了解过Java泛型的实现是不完整的,最近在做一些代码重构的时候遇到一些Java泛型类型擦除的问题,简单的来说,Java泛型中所指定的类型在编译时会将其去除,因此List 和 List 在编译成字节码的时候实际上是一样的.因此java泛型只能做到编译期检查的功能,运行期间就不能保证类型安全.我最近遇到的一个问题如下: 假设有两个bean类 /** Test. */ @Data @NoArgsConstructor @AllArgsConstructor public static class

  • 详解java中List中set方法和add方法的区别

    目录 前言 相同点 不同点 总结 前言 在Java中的常用的集合接口List中有两个非常相似的方法: E set(int index, E element); void add(int index, E element); 这两个方法都是在集合的指定位置插入指定的元素,那么这两个方法到底有什么区别呢?接下来我们通过ArrayList这个我们常用集合实现来看一下这两个方法的差异 相同点 首先我们来看一下这两个方法在ArrayList中的相同点 他们都会在集合的指定位置插入新的元素,例如下面的例子:

  • 详解Java类库的概念以及import的使用方法

    Java类库及其组织结构(Java API) Java 官方为开发者提供了很多功能强大的类,这些类被分别放在各个包中,随JDK一起发布,称为Java类库或Java API. API(Application Programming Interface, 应用程序编程接口)是一个通用概念. 例如我编写了一个类,可以获取计算机的各种硬件信息,它很强大很稳定,如果你的项目也需要这样一个功能,那么你就无需再自己编写代码,将我的类拿来直接用就可以.但是,我的类代码很复杂,让你读完这些代码不太现实,而且我也不

  • 详解java中的深拷贝和浅拷贝(clone()方法的重写、使用序列化实现真正的深拷贝)

    1.序列化实现 public class CloneUtils { @SuppressWarnings("unchecked") public static <T extends Serializable> T clone(T object){ T cloneObj = null; try { ByteArrayOutputStream out = new ByteArrayOutputStream(); ObjectOutputStream obs = new Objec

  • 详解Java中使用泛型实现快速排序算法的方法

    快速排序算法概念 快速排序一般基于递归实现.其思路是这样的: 1.选定一个合适的值(理想情况中值最好,但实现中一般使用数组第一个值),称为"枢轴"(pivot). 2.基于这个值,将数组分为两部分,较小的分在左边,较大的分在右边. 3.可以肯定,如此一轮下来,这个枢轴的位置一定在最终位置上. 4.对两个子数组分别重复上述过程,直到每个数组只有一个元素. 5.排序完成. 基本实现方式: public static void quickSort(int[] arr){ qsort(arr,

  • 详解Java编程中Annotation注解对象的使用方法

    注解(也被称为元数据)为我们在代码中添加信息提供了一种形式化的方法,使我们可以在稍后某个时刻非常方便地使用这些数据.   1.基本语法 Java SE5内置三种标准注解 @Override:表示当前的方法定义将覆盖超类中的方法.如果你不小心拼写错误,或者方法签名对不上被覆 盖的方法,编译器就会发出错误提示 @Deprecated:如果程序员使用了注解为它的元素,那么编译器就会发出警告信息 @SupperessWarnings:关闭不当的编译器警告信息. Java SE5内置四种元注解 @Targ

  • 详解Java多线程编程中CountDownLatch阻塞线程的方法

    直译过来就是倒计数(CountDown)门闩(Latch).倒计数不用说,门闩的意思顾名思义就是阻止前进.在这里就是指 CountDownLatch.await() 方法在倒计数为0之前会阻塞当前线程. CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待. CountDownLatch 的作用和 Thread.join() 方法类似,可用于一组线程和另外一组线程的协作.例如,主线程在做一项工作之前需要一系列的准备工作,只有这些准备工

  • 详解Java获取环境变量及系统属性的方法

    环境变量这个概念不陌生, 就是操作系统的环境变量. 系统变量就是java本身维护的变量. 通过 System.getProperty 的方式获取. 对于不同的操作系统来说, 环境变量的处理可能会有一些不统一的地方, 比如说: 不区分大小写 等等. Java 获取环境变量 Java 获取环境变量的方式很简单: System.getEnv()  得到所有的环境变量 System.getEnv(key) 得到某个环境变量的值 Map map = System.getenv(); Iterator it

  • 详解Java中使用ImageIO类对图片进行压缩的方法

    最近做项目需要图片压缩处理,网上找的方法大都使用了 com.sun.image.codec.jpeg.* 这个包中的JPEGImageEncoder类,引入这个包后一直报错,各种google百度,尝试了各种方法,包括手动引jre中的rt.jar,以及在eclipse中把受访问限制的API提示从ERROR改为WARNING,等等,然而这些都是不好使的,因为后来我发现我的java-7-openjdk-amd64中的rt.jar里边根本就没有com.sun.image.*,貌似这个类在java7中已经

随机推荐