麻绳先生

做一些记录性的工作

PyQt5小程序上传图片到七牛云

github地址
https://github.com/zkalan/qiniu-image.git

绘制软件界面

使用Qt Designer绘制程序界面,设置图标,修改控件id等。这个过程还包括几个配置步骤:

  • 在pycharm中安装PyQt5
  • 在pycharm中安装PyQt5-tools
  • 配置pycharm,方便的通过tools界面启动dedigner和其他工具

slots和signals

这是两个PyQt中的概念,signal就是信号,是动作的发出者;slot就是槽,是动作的相应者。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# -*- coding: utf-8 -*-
import sys, os
if hasattr(sys, 'frozen'):
os.environ['PATH'] = sys._MEIPASS + ";" + os.environ['PATH']
from ui import mainwindow
from PyQt5 import QtGui, QtCore
from PyQt5.QtWidgets import QMainWindow, QApplication, QMessageBox, QFileDialog
from utils import uploadtoqiniu

class WindowService(QMainWindow, mainwindow.Ui_MainWindow):
def __init__(self):
super(WindowService, self).__init__()
self.setupUi(self)
# bind slots and signals
self.pb_select.clicked.connect(self.pb_select_service)

self.pb_copy_url.clicked.connect(self.pb_copy_url_service)
self.pb_copy_markdown.clicked.connect(self.pb_copy_markdown_service)
self.pb_upload.clicked.connect(self.pb_upload_service)

def pb_select_service(self):
filename, filetype = QFileDialog.getOpenFileName(self,
'选择文件',
os.getcwd(),
"Images (*.jpg;*.jpeg;*.gif;*.png;*.ico);;All Files (*)")
if filename == '':
print('filename is' + filename)
return
self.le_selectimage.setText(filename)
print('filename is' + filename)
print('文件筛选器类型:', filetype)

def pb_upload_service(self):
filepath = self.le_selectimage.text()
if filepath == '':
self.maessage_box('select a image')
else:
if os.path.exists('settings.cfg'):
back_url = uploadtoqiniu.upload(filepath)
self.le_url.setText(back_url)
self.le_md.setText('![](' + back_url + ')')
img = QtGui.QImage(filepath)
scale = img.size().width()/411 if ((img.size().width()/411) > (img.size().height()/201)) else img.size().height()/201
scale_width = img.size().width()/scale
scale_height = img.size().height() / scale
size = QtCore.QSize(scale_width, scale_height)
image = QtGui.QPixmap.fromImage(img.scaled(size, QtCore.Qt.IgnoreAspectRatio))
self.l_preview.setPixmap(image)
print(image.size())
print(type(image.size()))
self.maessage_box('Success')
else:
self.maessage_box('check setting.cfg')

def pb_copy_url_service(self):
clipboard = QApplication.clipboard()
url = self.le_url.text()
clipboard.setText(url)
self.maessage_box('copy to clipkboard')

def pb_copy_markdown_service(self):
clipboard = QApplication.clipboard()
md_url = self.le_md.text()
clipboard.setText(md_url)
self.maessage_box('copy to clipkboard')

def maessage_box(self, message, title='information'):
box = QMessageBox(self)
box.setIcon(QMessageBox.Information)
box.setText(message)
box.setWindowTitle(title)
box.setStandardButtons(QMessageBox.Ok)
box.button(QMessageBox.Ok).animateClick(2*1000)
box.exec_()

使用pyinstaller打包

pyinstaller打包总是问题百出,PyQt5自己也有不少bug。遇到这些问题

ImportError: unable to find Qt5Core.dll on PATH

解决办法如下

https://blog.csdn.net/zwyact/article/details/99778898
在主程序中pyqt5库import之前就对系统变量进行手动设置

1
2
3
import sys, os
if hasattr(sys, 'frozen'):
os.environ['PATH'] = sys._MEIPASS + ";" + os.environ['PATH']

NameError: name ‘exit’ is not defined

解决办法就是这样写sys.exit

效果预览

settings.cfg是配置文件,其中需要配置各个字段。可以配置是否使用水印。

主界面

上传图片

上传完图片还能快速获取访问链接。

github地址
https://github.com/zkalan/qiniu-image.git

(no title)

SparkSQL介绍

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了两个编程抽象:DataSet和DataFrame。

Hive SQL转化为MapReduce,然后提交给集群执行,但是MapReduce计算模型执行效率太低,于是诞生了Spark SQL。

将Spark SQL转化为RDD,然后提交给集群执行,效率很高!

RDD DataSet DataFrame

DataFrame是一个分布式数据容器,更像传统关系数据库的二维表。DataSet是DataFrame API最新扩展,是强类型的数据结构,类似于类的概念。

SparkSQL编程

SparkSession

SparkSession是SparkContext和HiveContext的结合,实际计算由SparkContext负责。

val df = spark.read.json(filepath)

org.sparl.sql.DataFrame=[age:bigint,name:string]

df.show

df.createTempView()

spark.sql(“select * from student”).show

df.createGlobalTempView()

DSL风格语法

df.printSchema

df.select(“name”).show()

import spark,implicits._

DataSet基本操作

df.rdd

自定义用户聚合函数,有强类型和弱类型。

(no title)

Java虚拟机——内存区域与内存溢出异常(一)

运行时数据区

运行时数据区

程序计数器

每条线程都需要有一个独立的程序计数器,各条线程之间计数器互不影响,独立存储,被称为线程私有内存。

如果线程正在执行的是一个Java方法,这个计数器记录的时正在执行的虚拟机字节码指令的地址;如果正在执行的时Native方法,这个计数器值则为空(Undefined)。此内存区域是唯一一个在Java虚拟机规范中没有规定任何OutOfMemoryError情况的区域。

Java虚拟机栈

也是线程私有,它的生命周期与线程相同。虚拟机栈描述的是Java方法执行的内存模型:每个方法执行都会创建一个栈帧(Stack Frame)用于存储局部变量表、操作数栈、动态链接、方法出口等信息。常说的“栈”一般指的是虚拟机栈中的局部变量表。

局部变量表u从南方了编译期可知的各类基本数据类型、对象引用、returnAddress类型。

局部变量表所需的内存空间在编译期间完成分配,在方法运行期间不会改变大小,但所处的栈顶可能动态扩展。

如果线程请求的栈深度大于虚拟机所允许的深度,将抛出StackOverflowError异常;如果虚拟机栈动态扩展时无法申请足够的内存,就会抛出OutOfMemoryError异常。

本地方法栈

虚拟机栈为虚拟机执行Java方法服务,也就是字节码,而本地方法栈则为虚拟机使用到的Native方法服务。

Java堆

所有线程共享的一块区域。

此内存区域的唯一目的就是存放对象实例,几乎所有的对象实例都在这里分配内存。

Java堆是垃圾收集管理的主要区域,因此还被称为“GC堆”(Garbage Collected Heap)。

根据虚拟机规范的规定,Java堆可以处于物理上不连续的内存空间中,只需要逻辑连续;在实现时,既可以是固定大小的,也可以是可扩展的,通过-Xmx和-Xms控制。

方法区

线程共享区域。主要用于存储已被虚拟机加载的类信息、常量、静态变量、即时编译器编译后的代码等数据。虽然Java虚拟机将方法区描述为堆的逻辑部分,但是他还有一个名字叫做Non-Heap,目的应该是与堆进行区分。

Java虚拟机规范对方法区的限制非常宽松,除了和Java堆一样不需要连续的存储空间,还可以选择不实现垃圾回收

运行时常量池

是方法区的一部分。Class文件中除了有类的版本、字段、方法、接口等描述信息外,还有一项信息是常量池(Constant Pool Table),用于存放编译期生成的各种字面量和符号引用,这部分信息在类加载后进入方法去的运行时常量池存放。

运行时常量池相对于Class文件常量池的另外的一个重要特征就是具备动态性,java语言并不要求常量只有编译期才能产生,也就是并非预置入Class文件中常量池的内容才能进入方法区运行时常量池,运行期间也可能有新的常量放入池中,这种特性被开发人员利用较多的就是String.intern()。

直接内存

Direct Memory并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域,但是这部分内存使用频繁,也可能导致OOM异常。

NIO类引入了一种基于通道和缓冲区的IO方式,可以使用Native函数库直接分配堆外内存,然后通过一个存储在Java堆中的DirectByteBuffer对象作为这块内存的引用进行操作,这样能在一定程度提升性能,避免了在Java堆和Native堆中来回复制数据。

直接内存的分配不受Java堆大小限制,所以-Xmx等参数信息再加直接内存可能导致总和大于物理内存限制,导致OOM异常。

spark第一次笔记

hadoop历史

2011年发布1.0版本,2012年发布稳定版,2013发布2.x版本(Yarn)

缺点

  • 基于数据集的计算,所以面向数据。从存储介质中获取数据,然后进行计算,最后将结果存储到介质中。所以主要应用于一次性计算,不适合数据挖掘和机器学习这些迭代计算和图形挖掘计算。
  • mapreduce基于文件存储介质的操作,性能差
  • mapreduce和hadoop紧密耦合

Yarn版本

ResourceManager、ApplicationMaster、Driver、NodeManager

spark历史

2013年6月正式发布,spark基于hadoop1.x架构思想设计思想。spark计算基于内存,并且基于Scala语法开发,所以天生适合迭代式计算。

HDFS-Yarn-Spark

Spark下载地址

http://spark.apache.org

重要角色

Driver(驱动器)

Executer(执行器)

Spark-Yarn运行模式简图

Spark-Yarn运行模式简图

Standalone模式

RDD

RDD(Resilient Distributed Dataset)称为弹性分布式数据集,是Spark中最基本的数据或计算抽象。代码中是一个抽象类,代表一个不可变、可分区、里面的元素可进行并行计算的集合。

RDD属性

  • 一组分区Partition,即数据集的基本组成单位
  • 一个计算每个分区的函数
  • RDD之间的依赖关系
  • 一个Partitioner,即RDD的分片函数
  • 一个列表,存储存取每个Partiton的优先位置preferred location

大数据窍门:移动数据不如移动计算

RDD特点

RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必须的信息,RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。

Spark中所有RDD方法都称为算子,共分为两大类,转化算子和行动算子。

缓存

如果应用程序多次使用同一个RDD,可以将该RDD缓存起来。

RDD的创建

Spark中创建RDD的方法有三种:从集合中创建、从外部存储创建、从其他RDD创建。

集合创建

1
2
3
//前者调用后者
sc.makeRDD(List(1, 2, 3, 4))
sc.parallelize(Array(1, 2, 3, 4))

外部存储创建

1
2
3
//默认情况下,可以读取项目路径,也可以读取其他HDFS路径
val value:RDD[String] = sc.textField("in")
//涉及hadoop读取文件的分片规则

RDD的转换

Value类型

map(function)

返回一个新的RDD,该RDD由每一个输入元素经过function函数转换后组成。例如通过一个数组RDD得到一个每个元素被乘以2的新的RDD。

mapPartitions

mapPartitions效率优于map算子,减少了发送到执行器执行交互次数。但是可能会出现内存溢出OOM。一次处理一个分区的数据。

mapPartitionsWithIndex(func)

作用是,func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) -> Iterator[U]。

Driver和Executor关系

Driver就是创建Spark上下文对象的应用程序,Executor就是用于提取任务并执行。

flatMap
glom

将每一个分区形成一个数组,形成新的RDD类型为RDD[Array[T]]。

groupBy & filter

分组,按照传入函数的返回值进行分组,将相同的key对应的值放入一个迭代器。

sample(withRepalcement, fraction, seed)

以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示抽出的数据是否放回,true表示有放回,seed用于指定随机数生成器种子。

distinct
coalesce
repartition & sortBy

双Value类型交互

union
cartesian

Key-Value类型

partitionBy
groupBykey
reduceBykey
aggregateBykey
foldByKey & combinByKey
sortByKey & mapValueByKey

方法不同,底层实现和代码的执行位置很不同,
是的Driver执行还是在Executor执行?

RDD的依赖

窄依赖

窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖被比喻为独生子女。

宽依赖

宽依赖指的是每一个父RDD的Partition可以被多个子RDD的Partition使用。

DAG

任务划分

宽依赖放在不同的stage,窄依赖放在同一个stage。

  1. Application:初始化一个SparkContext即生成一个Application;
  2. Job:一个Action算子就会生成一个Job;
  3. Stage:根据RDD依赖关系,将Job划分成不同的Stage,一个宽依赖就划分一个新的Stage;
  4. Task:Stage是一个TaskSet,将Stage划分的结果发送到不同Executor执行即为一个Task;
  5. 上述四者每一层都是一对n的关系;

RDD的缓存

通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中。但是并不是这两个方法调用时立刻缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,供后面重用。

此外还有checkpoint功能,通过函数setCheckpointDir(dir)设置。

一般在血缘关系比较长的时候使用。

RDD分区器

Spark三大数据结构

  • RDD:分布式数据集
  • 广播变量:分布式只读共享变量 broadcast,调优策略
  • 累加器:分布式只写共享变量,LongAccumulator

Spark中的数量

  • Executor:默认有2个;可以通过提交参数设置;
  • partition:默认情况,读取文件采用Hadoop的切片规则,如果读取内存中的数据,可以根据特定的算法设定,可以通过其他算子进行改变;多个阶段的场合,下一个阶段分区的数量受到上一个阶段分区的影响;
  • Stage:ResultStage + Shuffle依赖的数量,划分阶段的目的就是为了任务执行的等待,因为Shuffle的过程需要落盘;
  • Task:原则上一个分区就是一个任务,但实际中,可以动态调整;

javaCV初步使用之rtmp推流和在线播放

从设备获取视频流

参考链接
https://blog.csdn.net/eguid_1/article/details/52678775

这块内容主要是javaCV基本API的使用,代码非常简单,我将其写在了一个类中,位于
src/main/java/com/zkalan/capture下,然后再demo类中调用测试;需要注意的是,网
上的很多代码将画板canvas的销毁方式设置为EXIT_ON_CLOSE,这会导致java虚拟机直接
推出,资源无法释放。我认为正确的写法应该是DISPOSED_ON_CLOSE。

1
canvas.setDefaultCloseOperation(WindowConstants.DISPOSE_ON_CLOSE);

搭建rtmp推流服务器

通过nginx服务器和nginx-rtmp-module可以简单的搭建一个rtmp服务器,我将使用的nginx
编译版本放在了这里
,这是已经包含了nginx-rtmp-module的版本,使用非常简单,首先在配置文件,例如
nginx-win-rtmp.conf中,添加一段rtmp配置项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
rtmp {
server {
listen 1935;
chunk_size 4000;
#mylive就是直播项目名
application mylive {
live on;

# record first 1K of stream
record all;
record_path /tmp/av;
record_max_size 1K;

# append current timestamp to each flv
record_unique on;

# publish only from localhost
#allow publish 127.0.0.1;
#deny publish all;

#allow play all;
}
}
}

然后在命令行中启动nginx服务器,例如命令

1
start nginx.exe -c conf\nginx-win-rtmp.conf

最后,启动第一部分写好的demo程序,注意publish address应该是这样的格式
rtmp://localhost:port/mylive/,使用potplayer或者vlc访问该链接,测试推流是否成功。

在网页播放rtmp流

参考链接
https://blog.csdn.net/qq_30152271/article/details/84334734

操作起来很简单,就是创建一个静态网页,放到服务器目录下,启动任何一个静态服务器,例如
上文的nginx,访问该静态网页,只要地址填写正确,就可以观看推流内容了。

然而需要注意的是,videojs虽然声称是一个html5播放器,但它的5.x版本播放rtmp流时依然需要flash 支持,并且6.x及以后版本不支持rtmp播放,也许是为了真正的叫做“html5播放器”?

Demo地址

javaCV-rtmp-demo

浏览器效果图