博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink流计算随笔(6)
阅读量:5911 次
发布时间:2019-06-19

本文共 20965 字,大约阅读时间需要 69 分钟。

​生成,编译模板工程

MacBook-Air:SocketWindowWordCount myhaspl$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current                                 Dload  Upload   Total   Spent    Left  Speed100 11510  100 11510    0     0   4499      0  0:00:02  0:00:02 --:--:--  4508This script creates a Flink project using Scala and SBT.Project name (Flink Project): SocketWindowWordCountOrganization (org.example): myhaspl.comVersion (0.1-SNAPSHOT): Scala version (2.11.12): Flink version (1.6.0): -----------------------------------------------Project Name: SocketWindowWordCountOrganization: myhaspl.wordcountVersion: 0.1-SNAPSHOTScala version: 2.11.12Flink version: 1.6.0-----------------------------------------------Create Project? (Y/n): yCreating Flink project under socketwindowwordcountMacBook-Air:SocketWindowWordCount myhaspl$ ls   socketwindowwordcount$cd  socketwindowwordcount$sbt clean assembly
MacBook-Air:socketwindowwordcount myhaspl$ sbt run[info] Loading settings for project socketwindowwordcount-build from assembly.sbt ...[info] Loading project definition from /Users/aaaaaa/Documents/scala/learn_2/socketwindowwordcount/project[info] Loading settings for project root from idea.sbt,build.sbt ...[info] Set current project to Flink Project (in build file:/Users/aaaaaaaa/Documents/scala/learn_2/socketwindowwordcount/)[warn] Multiple main classes detected.  Run 'show discoveredMainClasses' to see the listMultiple main classes detected, select one to run: [1] myhaspl.wordcount.Job [2] myhaspl.wordcount.SocketTextStreamWordCount [3] myhaspl.wordcount.WordCountEnter number: 3[info] (and,1)[info] (arrows,1)[info] (be,2)[info] (is,1)[info] (nobler,1)[info] (of,2)[info] (a,1)[info] (in,1)[info] (mind,1)[info] (or,2)[info] (slings,1)[info] (suffer,1)[info] (against,1)[info] (arms,1)[info] (not,1)[info] (outrageous,1)[info] (sea,1)[info] (the,3)[info] (tis,1)[info] (troubles,1)[info] (whether,1)[info] (fortune,1)[info] (question,1)[info] (take,1)[info] (that,1)[info] (to,4)[success] Total time: 8 s, completed Oct 11, 2018 8:56:09 AMMacBook-Air:learn2 myhaspl$ sbt run[info] Loading settings for project learn2-build from assembly.sbt ...[info] Loading project definition from /Users/aaaaaaa/Documents/scala/learn_2/learn2/project[info] Loading settings for project root from idea.sbt,build.sbt ...[info] Set current project to Flink Project (in build file:/Users/aaaaaaa/Documents/scala/learn_2/learn2/)[info] Running (fork) learn [info] 16[info] 2 add 5 =7[info] 2 add 0 =2[info] 15[success] Total time: 2 s, completed Oct 11, 2018 11:18:48 AM
MacBook-Air:learn_2 myhaspl$ pwd/Users/A/Documents/scala/learn_2MacBook-Air:learn_2 myhaspl$ vim learn_2.scala
object learn {  def main(args: Array[String]): Unit = {      println(myPower(2,4))      println(myAdd(2,5))      println(myAdd(2))        println(mySum(1,2,3,4,5))  }  @annotation.tailrec  def myPower(x:Int,n:Int,t:Int=1):Int={    if (n<1) t    else myPower(x,n-1,x*t)   }  def myAdd(x:Int,y:Int=0)={     val result:Int=x+y     s"$x add $y =$result"  }  def mySum(nums:Int*)={//可变参数     var sumNum=0     for (num<-nums){       sumNum+=num     }     sumNum  }}
MacBook-Air:learn_2 myhaspl$ lslearn_2.scala
MacBook-Air:learn_2 myhaspl$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current                                 Dload  Upload   Total   Spent    Left  Speed100 11510  100 11510    0     0   3185      0  0:00:03  0:00:03 --:--:--  3189This script creates a Flink project using Scala and SBT.Project name (Flink Project): learn2Organization (org.example): myhasplVersion (0.1-SNAPSHOT): Scala version (2.11.12): Flink version (1.6.0): -----------------------------------------------Project Name: learn2Organization: myhasplVersion: 0.1-SNAPSHOTScala version: 2.11.12Flink version: 1.6.0-----------------------------------------------Create Project? (Y/n): yCreating Flink project under learn2MacBook-Air:learn_2 myhaspl$ lslearn2      learn_2.scalaMacBook-Air:learn_2 myhaspl$ cd learn2MacBook-Air:learn2 myhaspl$ lsREADME      build.sbt   idea.sbt    project     srcMacBook-Air:learn2 myhaspl$ cd sr-bash: cd: sr: No such file or directoryMacBook-Air:learn2 myhaspl$ cd srcMacBook-Air:src myhaspl$ lsmainMacBook-Air:src myhaspl$ cd mainMacBook-Air:main myhaspl$ lsresources   scalaMacBook-Air:main myhaspl$ cd scalaMacBook-Air:scala myhaspl$ lsmyhasplMacBook-Air:scala myhaspl$ cd myhasplMacBook-Air:myhaspl myhaspl$ lsJob.scala           WordCount.scalaSocketTextStreamWordCount.scalaMacBook-Air:myhaspl myhaspl$ rm *.scalaMacBook-Air:myhaspl myhaspl$ ls
MacBook-Air:myhaspl myhaspl$ cp /Users/aaaaa/Documents/scala/learn_1/src/learn.scala learn.scalaMacBook-Air:myhaspl myhaspl$ lslearn.scalaMacBook-Air:myhaspl myhaspl$ pwd/Users/aaaaa/Documents/scala/learn_2/learn2/src/main/scala/myhasplMacBook-Air:myhaspl myhaspl$ sbt clean assemblyMacBook-Air:learn2 myhaspl$ pwd/Users/aaaaa/Documents/scala/learn_2/learn2MacBook-Air:learn2 myhaspl$  sbt clean assembly[info] Updated file /Users/bbbb/Documents/scala/learn_2/learn2/project/build.properties: set sbt.version to 1.2.4[info] Loading settings for project learn2-build from assembly.sbt ...[info] Loading project definition from /Users/aaaaa/Documents/scala/learn_2/learn2/project[info] Updating ProjectRef(uri("file:/Users/aassfdfsdaxg/Documents/scala/learn_2/learn2/project/"), "learn2-build")...[info] Done updating.[warn] There may be incompatibilities among your library dependencies.[warn] Run 'evicted' to see detailed eviction warnings[info] Loading settings for project root from idea.sbt,build.sbt ...[info] Set current project to Flink Project (in build file:/Users/cccccc/Documents/scala/learn_2/learn2/)[success] Total time: 0 s, completed Oct 11, 2018 11:03:29 AM[info] Updating ...[info] Done updating.[info] Compiling 1 Scala source to /Users/AAA/Documents/scala/learn_2/learn2/target/scala-2.11/classes ...[info] Done compiling.[info] Checking every *.class/*.jar file's SHA-1.[info] Merging files...[info] SHA-1: eaaa2f651ba4387defc6282c2de36e6dd4402f32[info] Packaging /Users/aaaaaasdf/Documents/scala/learn_2/learn2/target/scala-2.11/Flink Project-assembly-0.1-SNAPSHOT.jar ...[info] Done packaging.[success] Total time: 12 s, completed Oct 11, 2018 11:03:41 AM
MacBook-Air:learn2 myhaspl$ sbt[info] Loading settings for project learn2-build from assembly.sbt ...[info] Loading project definition from /Users/aaaaaa/Documents/scala/learn_2/learn2/project[info] Loading settings for project root from idea.sbt,build.sbt ...[info] Set current project to Flink Project (in build file:/Users/aaaaaa/Documents/scala/learn_2/learn2/)[info] sbt server started at local:///Users/aaaaaa/.sbt/1.0/server/e759170bdd67731a9bda/socksbt:Flink Project> compile[success] Total time: 1 s, completed Oct 11, 2018 11:27:46 AMsbt:Flink Project> package[info] Packaging /Users/aaaaaa/Documents/scala/learn_2/learn2/target/scala-2.11/flink-project_2.11-0.1-SNAPSHOT.jar ...[info] Done packaging.[success] Total time: 0 s, completed Oct 11, 2018 11:27:53 AMMacBook-Air:target myhaspl$ cd scala-2.11MacBook-Air:scala-2.11 myhaspl$ lsFlink Project-assembly-0.1-SNAPSHOT.jar flink-project_2.11-0.1-SNAPSHOT.jarclasses                 resolution-cache

跑官方例子,从端口接收字符串文本,然后,wordcount

MacBook-Air:flink myhaspl$ ./libexec/bin/start-cluster.shStarting cluster.Starting standalonesession daemon on host MacBook-Air.local.Starting taskexecutor daemon on host MacBook-Air.local.MacBook-Air:Documents myhaspl$ nc -l 9800aa ss dd ff ggbye^CMacBook-Air:flink myhaspl$ flink run libexec/examples/streaming/SocketWindowWordCount.jar  --port 9800Starting execution of programProgram execution finishedJob with JobID 1625d3a29cfdf8fa8a77c3e5c8e9d30e has finished.Job Runtime: 9181 ms单词以5秒的时间窗口(处理时间,滚动窗口)计数,并打印到标准输出。监视任务管理器的输出文件,并在nc中写入一些文本(点击后一行一行地将输入发送到Flink):启动nc -l 9800后需要快速输入,5秒MacBook-Air:flink myhaspl$ ls -la libexec/log/*-rw-r--r--  1 myhaspl  admin  19733 10 11 14:41 libexec/log/flink-myhaspl-standalonesession-0-MacBook-Air.local.log-rw-r--r--  1 myhaspl  admin      0 10 11 14:41 libexec/log/flink-myhaspl-standalonesession-0-MacBook-Air.local.out-rw-r--r--  1 myhaspl  admin  32693 10 11 14:41 libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.log-rw-r--r--  1 myhaspl  admin     43 10 11 14:41 libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.outMacBook-Air:flink myhaspl$ cat libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.outaa : 1gg : 1ff : 1dd : 1ss : 1bye : 1MacBook-Air:flink myhaspl$

清空log下的文件,并重新启动cluster,然后,输入几段字符,记住输入时一定快,在5秒内搞定,否则在.out文件中找不到结果的,因为程序上设定如此

object SocketWindowWordCount {    def main(args: Array[String]) : Unit = {        // the port to connect to        val port: Int = try {            ParameterTool.fromArgs(args).getInt("port")        } catch {            case e: Exception => {                System.err.println("No port specified. Please run 'SocketWindowWordCount --port 
'") return } } // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // get input data by connecting to the socket val text = env.socketTextStream("localhost", port, '\n') // parse the data, group it, window it, and aggregate the counts val windowCounts = text .flatMap { w => w.split("\\s") } .map { w => WordWithCount(w, 1) } .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .sum("count") // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1) env.execute("Socket Window WordCount") } // Data type for words with count case class WordWithCount(word: String, count: Long)}
MacBook-Air:flink myhaspl$ ./libexec/bin/stop-cluster.shMacBook-Air:flink myhaspl$ ./libexec/bin/start-cluster.shMacBook-Air:Documents myhaspl$ nc -l 9800ssssff gg ss^CMacBook-Air:flink myhaspl$ flink run libexec/examples/streaming/SocketWindowWordCount.jar  --port 9800Starting execution of programProgram execution finishedJob with JobID 3cda8e474ed0050ef05828fc91cd8302 has finished.Job Runtime: 3509 msMacBook-Air:flink myhaspl$ cat libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.outss : 2gg : 1ff : 1MacBook-Air:flink myhaspl$

我们下面把时间 由5秒搞长一点

重新建立一个flink模板

MacBook-Air:scala myhaspl$ mkdir learn_3MacBook-Air:scala myhaspl$ cd learn_3MacBook-Air:learn_3 myhaspl$ lsMacBook-Air:learn_3 myhaspl$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current                                 Dload  Upload   Total   Spent    Left  Speed100 11510  100 11510    0     0   5130      0  0:00:02  0:00:02 --:--:--  5274This script creates a Flink project using Scala and SBT.Project name (Flink Project): learn3Organization (org.example): myhasplVersion (0.1-SNAPSHOT): Scala version (2.11.12): Flink version (1.6.0): -----------------------------------------------Project Name: learn3Organization: myhasplVersion: 0.1-SNAPSHOTScala version: 2.11.12Flink version: 1.6.0-----------------------------------------------Create Project? (Y/n): yCreating Flink project under learn3MacBook-Air:learn_3 myhaspl$ MacBook-Air:learn_3 myhaspl$ cd learn3MacBook-Air:learn3 myhaspl$ lsREADME      build.sbt   idea.sbt    project     srcMacBook-Air:learn3 myhaspl$ cd srcMacBook-Air:src myhaspl$ lsmainMacBook-Air:src myhaspl$ cd mainMacBook-Air:main myhaspl$ lsresources   scalaMacBook-Air:main myhaspl$ cd scalaMacBook-Air:scala myhaspl$ lsmyhasplMacBook-Air:scala myhaspl$ cd myhasplMacBook-Air:myhaspl myhaspl$ lsJob.scala           WordCount.scalaSocketTextStreamWordCount.scalaMacBook-Air:myhaspl myhaspl$ rm W*.scalaMacBook-Air:myhaspl myhaspl$ rm J*.scalaMacBook-Air:myhaspl myhaspl$ lsSocketTextStreamWordCount.scalaMacBook-Air:myhaspl myhaspl$ MacBook-Air:myhaspl myhaspl$ pwd/Users/xxxxx/Documents/scala/learn_3/learn3/src/main/scala/myhasplMacBook-Air:myhaspl myhaspl$ cd ../../..MacBook-Air:src myhaspl$ lsmainMacBook-Air:src myhaspl$ cd ../..MacBook-Air:learn_3 myhaspl$ lslearn3MacBook-Air:learn_3 myhaspl$ cd learn3MacBook-Air:learn3 myhaspl$ lsREADME      build.sbt   idea.sbt    project     srcMacBook-Air:learn3 myhaspl$ sbt clean assemblyMacBook-Air:learn3 myhaspl$ sbt[info] Loading settings for project learn3-build from assembly.sbt ...[info] Loading project definition from /Users/zzzzzz/Documents/scala/learn_3/learn3/project[info] Loading settings for project root from idea.sbt,build.sbt ...[info] Set current project to Flink Project (in build file:/Users/zzzzzz/Documents/scala/learn_3/learn3/)[info] sbt server started at local:///Users/lzz/.sbt/1.0/server/1d8f10f02b1fbf814396/socksbt:Flink Project> compile[success] Total time: 1 s, completed Oct 11, 2018 3:30:19 PMsbt:Flink Project> package[info] Packaging /Users/zzz/Documents/scala/learn_3/learn3/target/scala-2.11/flink-project_2.11-0.1-SNAPSHOT.jar ...[info] Done packaging.[success] Total time: 0 s, completed Oct 11, 2018 3:30:29 PMsbt:Flink Project> exit[info] shutting down serverMacBook-Air:learn3 myhaspl$

停止cluster,重新启动,这次取消了时间窗口,可以慢慢输入

MacBook-Air:learn3 myhaspl$ ~/Documents/flink/libexec/bin/stop-cluster.shStopping taskexecutor daemon (pid: 29505) on host MacBook-Air.local.Stopping standalonesession daemon (pid: 29093) on host MacBook-Air.local.MacBook-Air:learn3 myhaspl$ ls ~/Documents/flink/libexec/logflink-myhaspl-standalonesession-0-MacBook-Air.local.log flink-myhaspl-taskexecutor-0-MacBook-Air.local.logflink-myhaspl-standalonesession-0-MacBook-Air.local.out flink-myhaspl-taskexecutor-0-MacBook-Air.local.outMacBook-Air:learn3 myhaspl$ rm ~/Documents/flink/libexec/log/*MacBook-Air:learn3 myhaspl$ ls ~/Documents/flink/libexec/logMacBook-Air:learn3 myhaspl$ MacBook-Air:learn3 myhaspl$ flink run  ~/Documents/scala/learn_3/learn3/target/scala-2.11/flink-project_2.11-0.1-SNAPSHOT.jar   127.0.0.1 9800
MacBook-Air:learn3 myhaspl$ ~/Documents/flink/libexec/bin/start-cluster.shStarting cluster.Starting standalonesession daemon on host MacBook-Air.local.Starting taskexecutor daemon on host MacBook-Air.local.MacBook-Air:learn3 myhaspl$ MacBook-Air:learn_3 myhaspl$ nc -l 9800Unbounded streams have a start but no defined end. They do not terminate and provide data as it is generated. Unbounded streams must be continuously processed, i.e., events must be promptly handled after they have been ingested. It is not possible to wait for all input data to arrive because the input is unbounded and will not be complete at any point in time. Processing unbounded data often requires that events are ingested in a specific order, such as the order in which events occurred, to be able to reason about result completeness.Bounded streams have a defined start and end. Bounded streams can be processed by ingesting all data before performing any computations. Ordered ingestion is not required to process bounded streams because a bounded data set can always be sorted. Processing of bounded streams is also known as batch processing.^CMacBook-Air:learn3 myhaspl$ flink run  ~/Documents/scala/learn_3/learn3/target/scala-2.11/flink-project_2.11-0.1-SNAPSHOT.jar   127.0.0.1 9800Starting execution of programProgram execution finishedJob with JobID 7f66cd617236d64520a08a44b0544234 has finished.Job Runtime: 63502 msMacBook-Air:learn3 myhaspl$ MacBook-Air:learn_3 myhaspl$ ls ~/Documents/flink/libexec/logflink-myhaspl-standalonesession-0-MacBook-Air.local.logflink-myhaspl-standalonesession-0-MacBook-Air.local.outflink-myhaspl-taskexecutor-0-MacBook-Air.local.logflink-myhaspl-taskexecutor-0-MacBook-Air.local.outMacBook-Air:learn_3 myhaspl$ cat ~/Documents/flink/libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.out...(bounded,4)(data,5)(set,1)(can,2)(always,1)(be,6)(sorted,1)(processing,2)(of,1)(bounded,5)(streams,6)(is,5)(also,1)(known,1)(as,3)(batch,1)(processing,3)(bye,1)

启动flink Scala REPL

MacBook-Air:learn3 myhaspl$ ~/Documents/flink/libexec/bin/start-scala-shell.sh localStarting Flink Shell:log4j:WARN No appenders could be found for logger (org.apache.flink.configuration.GlobalConfiguration).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.Starting local Flink cluster (host: localhost, port: 8081).Connecting to Flink cluster (host: localhost, port: 8081).                         ▒▓██▓██▒                     ▓████▒▒█▓▒▓███▓▒                  ▓███▓░░        ▒▒▒▓██▒  ▒                ░██▒   ▒▒▓▓█▓▓▒░      ▒████                ██▒         ░▒▓███▒    ▒█▒█▒                  ░▓█            ███   ▓░▒██                    ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█                  █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒                  ████░   ▒▓█▓      ██▒▒▒ ▓███▒               ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░         ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒        ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒      ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒     ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░    ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓ ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒ ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒ ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█ ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓█▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒ ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒ ▓█   ▒█▓   ░     █░                ▒█              █▓  █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░   █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█    ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓     ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██      ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓        ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒            ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░              F L I N K - S C A L A - S H E L LNOTE: Use the prebound Execution Environments to implement batch or streaming programs.  Batch - Use the 'benv' variable    * val dataSet = benv.readTextFile("/path/to/data")    * dataSet.writeAsText("/path/to/output")    * benv.execute("My batch program")    HINT: You can use print() on a DataSet to print the contents to the shell.  Streaming - Use the 'senv' variable    * val dataStream = senv.fromElements(1, 2, 3, 4)    * dataStream.countWindowAll(2).sum(0).print()    * senv.execute("My streaming program")    HINT: You can only print a DataStream to the shell in local mode.scala>
scala> val text = benv.fromElements(     |   "To be, or not to be,--that is the question:--",     |   "Whether 'tis nobler in the mind to suffer",     |   "The slings and arrows of outrageous fortune",     |   "Or to take arms against a sea of troubles,")scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = org.apache.flink.api.scala.AggregateDataSet@254dfd34scala> counts.print()(a,1)(against,1)(and,1)(arms,1)(arrows,1)(be,2)(fortune,1)(in,1)(is,1)(mind,1)(nobler,1)(not,1)(of,2)(or,2)(outrageous,1)(question,1)(sea,1)(slings,1)(suffer,1)(take,1)(that,1)(the,3)(tis,1)(to,4)(troubles,1)(whether,1)

print()命令将自动将指定的任务发送给JobManager执行,并在终端中显示计算结果。

可以将结果写入文件。然而,在这种情况下,您需要调用execute来运行您的程序:

benv.execute("MyProgram")
scala> val text = benv.fromElements("Each task slot represents a fixed subset of resources of the TaskManager. A TaskManager with three slots, for example, will dedicate 1/3 of its managed memory to each slot. Slotting the resources means that a subtask will not compete with subtasks from other jobs for managed memory, but instead has a certain amount of reserved managed memory. Note that no CPU isolation happens here; currently slots only separate the managed memory of tasks.")text: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@6c551798scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = org.apache.flink.api.scala.AggregateDataSet@2be71a78scala> counts.print()(1,1)(3,1)(a,4)(amount,1)(but,1)(certain,1)(compete,1)(cpu,1)(currently,1)(dedicate,1)(each,2)(example,1)(fixed,1)(for,2)(from,1)(happens,1)(has,1)(here,1)(instead,1)(isolation,1)(its,1)(jobs,1)(managed,4)(means,1)(memory,4)(no,1)(not,1)(note,1)(of,5)(only,1)(other,1)(represents,1)(reserved,1)(resources,2)(separate,1)(slot,2)(slots,2)(slotting,1)(subset,1)(subtask,1)(subtasks,1)(task,1)(taskmanager,2)(tasks,1)(that,2)(the,3)(three,1)(to,1)(will,2)(with,2)scala> :q good bye ..MacBook-Air:learn3 myhaspl$

转载于:https://blog.51cto.com/13959448/2316212

你可能感兴趣的文章
【Tour of LeetCode】Q1——Two Sum
查看>>
2019.2.20 c++ 知识梳理
查看>>
Transformer-XL: Unleashing the Potential of Attention Models
查看>>
leetcode378. Kth Smallest Element in a Sorted Matrix
查看>>
Vultr 教程目录
查看>>
如何合理的规划jvm性能调优
查看>>
手机端车牌号码键盘的vue组件
查看>>
MySQL-事务管理(基础)
查看>>
关于List、List<?>、List<Object>的区别
查看>>
工作中总结前端开发流程--vue项目
查看>>
Less 日常用法
查看>>
免费小说阅读小程序
查看>>
PAT A1092
查看>>
js中forEach回调同异步问题
查看>>
33. Search in Rotated Sorted Array
查看>>
HTML-表单
查看>>
Mac 鼠须管 Rime 输入法 安装五笔输入法 教程
查看>>
Android 架构优化~MVP 架构改造
查看>>
动态魔术使用DBMS_SQL
查看>>
Redash本地开发环境搭建
查看>>