

[Java基础教程]第十三章-Java多线程
source link: https://blogread.cn/it/article/7900?f=hot1
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

[Java基础教程]第十三章-Java多线程
在前面我们介绍的客户端与服务器端通信的小程序,当一个客户端退出通信时服务器端也结束了,不满足我们对于服务器端的定位。通常服务器应该可以同时接受多个客户端的链接,并且客户端结束后服务器端应该正常运行。所以服务器端接受链接的代码应该一直接受新的客户链接,每个客户有一个独立的程序处理,相互之间不影响,这种场景成为并发。那么什么是并发,我们一起了解一下。
在介绍并发之前我们先了解一下串行和并行:
热闹的景点,买票人很多,这时只有一个窗口售票,大家排队依次买票就可以理解为串行。
排队人太多了,旁边又加开了几个窗口,多人在不同的窗口同时买票可以理解为并行。
如果只能开一个窗口,这时好多着急的人围上来,有问价格的,有掏钱的,又有取票的,在这个过程中售票员在同时应对多个买票人,可以理解为并发。
我们经常在计算机上一边听歌一边写文档(或者处理其他的事情),这就是一种并发行为,表面看两个程序是同时进行,为什么不是并行呢?计算机只有一个CPU所以只能支持一个线程运行。有人拍砖说:我家里计算机是多核CPU可以同时支持多个线程运行,确实是这样,为此我也特地去百度了一下有如下几点认知:
1、虽然是多核CPU但是,系统总线,内存是共用的,在加载内存数据时仍然需要串行访问。
2、目前的程序设计语言仍然是过程型开发,没有和好的方法能自动的切割任务使并行计算。
3、操作系统在线程调度时随着内核的增加复杂性递增,目前最多支持8核
所以基于以上认知,我们在讨论并发和同步这个问题时仍然按照CPU单核来讨论。
那么计算机是如何做到一边播放歌曲一边支持文档编辑呢?操作系统会把CPU的执行时间划分微妙级别的时间片段,每一个时间片内去调度一个线程执行,多个线程不断的切换执行,因此在人类可感知的时间段(秒级)内线程是同时执行的,所以多个线程在某个时间段内的同时执行就是并发。串行、并行和并发如下图所示:

Java对并发场景提供了线程类Runnable接口和Thread类支持,下面是重构后的服务器端代码:
publicstaticvoidmain(String[] args) throwsIOException { ServerSocket serverSocket = newServerSocket(8080); System.out.println("监听IP:"+ serverSocket.getInetAddress().getLocalHost().getHostAddress() + "; 监听端口:" + serverSocket.getLocalPort()); inti = 0; // 主线程一直在等待客户端连接 while(true) { Socket acceptSocket = serverSocket.accept(); // 接受到一个客户请求就会创建一个新线程处理 String sessionName = "session"+ i; Thread thread = newSessionProcessor(sessionName, acceptSocket); thread.start(); } } publicclassSessionProcessor extendsThread { /** * 会话名称 */ privateString name; /** * 与客户端连接的socket */ privateSocket acceptSocket; publicSessionProcessor(String name, Socket acceptSocket) { this.name = name; this.acceptSocket = acceptSocket; } /* (non-Javadoc) * @see java.lang.Runnable#run() */ @Override publicvoidrun() { InputStreamReader isr = null; BufferedReader br = null; try{ InputStream inputStream = acceptSocket.getInputStream(); isr = newInputStreamReader(inputStream, "utf-8"); br = newBufferedReader(isr); String line = null; while((line = br.readLine()) != null) { System.out.println(name + "客户输入:"+ line); } } catch(UnsupportedEncodingException e) { e.printStackTrace(); } catch(IOException e) { e.printStackTrace(); } finally{ try{ if(br != null) { br.close(); } if(isr != null) { isr.close(); } } catch(IOException e) { e.printStackTrace(); } } } }
main方法启动的线程,做为主线程一直在监听新的链接,每当有新的用户链接进来会创建一个新的线程”Thread thread = new SessionProcessor(sessionName, acceptSocket);”,并启动线程”thread.start()”.
新创建的线程执行逻辑是我们在继承了Thread的类SessionProcessor的run方法。从socket中获取输入流,读取客户端发送的字符串。
多线程实现的关键点:
1、继承Thread类或者实现Runnable接口,并重写run方法
2、创建线程并调用start方法。
为了以后方便查询,我们把聊天记录存储到文件中,新增ChatLogManager用于管理聊天记录的存储,每个SessionProcessor收到消息后调用ChatLogManager的save(String)方法,为了避免频繁打开聊天记录文件,我们先把聊天记录缓存到List中,每10条保存一次,代码重构如下:
publicclassChatLogManager { privateString fileName = "chatlog.txt"; privateList<String> tempChatLogList = newArrayList<>(); privateintmaxSize = 10; publicvoidsave(String chatLog) throwsIOException { // 因为文件访问速度慢,所以我们积累10条记录,写入一次文件 tempChatLogList.add(chatLog); if(tempChatLogList.size() == maxSize) { File chatLogFile = newFile(fileName); if(!chatLogFile.exists()) { chatLogFile.createNewFile(); } FileOutputStream fos = newFileOutputStream(chatLogFile, true); OutputStreamWriter osw = newOutputStreamWriter(fos); BufferedWriter bw = newBufferedWriter(osw); for(inti = 0; i < maxSize; i++) { bw.append(tempChatLogList.get(i) + "n"); } // 记录到文件后清空列表 tempChatLogList.clear(); bw.flush(); bw.close(); osw.close(); fos.close(); } } } //监听线程创建SessionProcessor新增ChatLogManager入参 publicstaticvoidmain(String[] args) throwsIOException { ServerSocket serverSocket = newServerSocket(8080); System.out.println("监听IP:"+ serverSocket.getInetAddress().getLocalHost().getHostAddress() + "; 监听端口:" + serverSocket.getLocalPort()); inti = 0; ChatLogManager manager = newChatLogManager(); // 主线程一直在等待客户端连接 while(true) { Socket acceptSocket = serverSocket.accept(); // 接受到一个客户请求就会创建一个新线程处理 String sessionName = "session"+ i; Thread thread = newSessionProcessor(sessionName, acceptSocket, manager); thread.start(); } } //SessionProcessor新增ChatLogManager属性和构造方法入参 privateChatLogManager manager; publicSessionProcessor(String name, Socket acceptSocket, ChatLogManager manager) { this.name = name; this.acceptSocket = acceptSocket; this.manager = manager; } //SessionProcessor的run方法收到消息后调用ChatLogManager的save方法 br = newBufferedReader(isr); String line = null; while((line = br.readLine()) != null) { manager.save(name + "客户输入:"+ line); } System.out.println("接受完毕");
使用telnet链接测试一下,聊天记录保存到工程根目录下的chatlog.txt,记得一定要输入10条的倍数,否则会有数据不能保存的。
为了更真实的模拟现实场景我们使用代码模拟10个用户同时访问的情景,创建线程类SenderProcessor,用于连接服务器端,并创建一个main方法创建10个线程,并启动,代码如下:
publicclassSenderProcessor extendsThread { @Override publicvoidrun() { try{ Socket socket = newSocket("127.0.0.1", 8080); OutputStream outputStream = socket.getOutputStream(); BufferedWriter writer = newBufferedWriter(newOutputStreamWriter(outputStream)); for(inti = 0; i < 10; i++) { writer.write("发送的第"+ i + "句话"); writer.newLine(); writer.flush(); } writer.close(); outputStream.close(); socket.close(); System.out.println("输出完毕"); } catch(UnknownHostException e) { e.printStackTrace(); } catch(IOException e) { e.printStackTrace(); } } } publicstaticvoidmain(String[] args) { for(inti = 0; i < 10; i++) { Thread thread = newSenderProcessor(); thread.start(); } }
好了,先运行Server的main方法,再运行Client的main方法,从client的10个线程共发送了100句聊天(每个线程10个聊天记录),但是查看chatlog.txt,发现保存的数据少了,多运行几次(注意每次把文件清空),发现每次缺少的数据个数和数据内容都不相同,为什么呢?还记得前面介绍的并发知识吗,同一时刻只有一个线程在执行,多线程时每个线程只是在非常小的时间内执行,其他非执行线程会等待,所以线程的执行时断断续续的,假如其中一个线程A在ChatLogManager的”tempChatLogList.clear();”被切换到另外一个线程B执行,B执行的是“tempChatLogList.add(chatLog);”,刚执行完,又切换到了A,然后”tempChatLogList.clear();”被执行了,B添加的聊天记录被清空了。此时就需要进行同步处理,多个线程共享资源(这里指文件和缓存List),为了解决资源竞争,需要保持线程串行访问共享资源,Java中关键字”synchronized”和接口“java.util.concurrent.locks.Lock”相关的类可以解决这种问题。这里我们使用关键字”synchronized”,代码如下:
publicsynchronizedvoidsave(String chatLog) throwsIOException { // 因为文件访问速度慢,所以我们积累10条记录,写入一次文件 tempChatLogList.add(chatLog); if(tempChatLogList.size() == maxSize) { File chatLogFile = newFile(fileName); if(!chatLogFile.exists()) { chatLogFile.createNewFile(); } FileOutputStream fos = newFileOutputStream(chatLogFile, true); OutputStreamWriter osw = newOutputStreamWriter(fos); BufferedWriter bw = newBufferedWriter(osw); for(inti = 0; i < maxSize; i++) { bw.append(tempChatLogList.get(i) + "n"); } // 记录到文件后清空列表 tempChatLogList.clear(); bw.flush(); bw.close(); osw.close(); fos.close(); } }
再次运行,就会发现慢慢的100个聊天记录都在文件中了。多线程能很大的提升程序员运行的效率,但是如果处理不好同步会得到不一样的预期结果。所以线程有风险,使用需谨慎。
小练习:
使用多线程分析该工程中所有的Java文件中a-z出现的次数,不区分大小写。
课程中的代码:
packagecom.sunhaojie.learntest.thirteenth; importjava.io.BufferedWriter; importjava.io.File; importjava.io.FileOutputStream; importjava.io.IOException; importjava.io.OutputStreamWriter; importjava.util.ArrayList; importjava.util.List; /** * @ClassName ChatLogManager * @Description 聊天记录管理类 * * @author sunhaojie [email protected] * @date 2016年2月13日 下午10:07:43 */ publicclassChatLogManager { privateString fileName = "chatlog.txt"; privateList<String> tempChatLogList = newArrayList<>(); privateintmaxSize = 10; publicsynchronizedvoidsave(String chatLog) throwsIOException { // 因为文件访问速度慢,所以我们积累10条记录,写入一次文件 tempChatLogList.add(chatLog); if(tempChatLogList.size() == maxSize) { File chatLogFile = newFile(fileName); if(!chatLogFile.exists()) { chatLogFile.createNewFile(); } FileOutputStream fos = newFileOutputStream(chatLogFile, true); OutputStreamWriter osw = newOutputStreamWriter(fos); BufferedWriter bw = newBufferedWriter(osw); for(inti = 0; i < maxSize; i++) { bw.append(tempChatLogList.get(i) + "n"); } // 记录到文件后清空列表 tempChatLogList.clear(); bw.flush(); bw.close(); osw.close(); fos.close(); } } } packagecom.sunhaojie.learntest.thirteenth; importjava.io.BufferedReader; importjava.io.IOException; importjava.io.InputStream; importjava.io.InputStreamReader; importjava.io.UnsupportedEncodingException; importjava.net.Socket; /** * @ClassName SessionProcessor * @Description 用户会话 * * @author sunhaojie [email protected] * @date 2016年2月13日 下午2:07:31 */ publicclassSessionProcessor extendsThread { /** * 会话名称 */ privateString name; /** * 与客户端连接的socket */ privateSocket acceptSocket; /** * 聊天记录管理类 */ privateChatLogManager manager; publicSessionProcessor(String name, Socket acceptSocket, ChatLogManager manager) { this.name = name; this.acceptSocket = acceptSocket; this.manager = manager; } /* (non-Javadoc) * @see java.lang.Runnable#run() */ @Override publicvoidrun() { InputStreamReader isr = null; BufferedReader br = null; try{ InputStream inputStream = acceptSocket.getInputStream(); isr = newInputStreamReader(inputStream, "utf-8"); br = newBufferedReader(isr); String line = null; while((line = br.readLine()) != null) { manager.save(name + "客户输入:"+ line); } System.out.println("接受完毕"); } catch(UnsupportedEncodingException e) { e.printStackTrace(); } catch(IOException e) { e.printStackTrace(); } finally{ try{ if(br != null) { br.close(); } if(isr != null) { isr.close(); } } catch(IOException e) { e.printStackTrace(); } } } } packagecom.sunhaojie.learntest.thirteenth; importjava.io.IOException; importjava.net.ServerSocket; importjava.net.Socket; /** * * @ClassName SocketServerTest * @Description 服务器端测试类 * * @author sunhaojie [email protected] * @date 2016年2月12日 下午10:53:45 */ publicclassSocketServerTest { @SuppressWarnings({ "static-access", "resource"}) publicstaticvoidmain(String[] args) throwsIOException { ServerSocket serverSocket = newServerSocket(8080); System.out.println("监听IP:"+ serverSocket.getInetAddress().getLocalHost().getHostAddress() + "; 监听端口:" + serverSocket.getLocalPort()); inti = 0; ChatLogManager manager = newChatLogManager(); // 主线程一直在等待客户端连接 while(true) { Socket acceptSocket = serverSocket.accept(); // 接受到一个客户请求就会创建一个新线程处理 String sessionName = "session"+ i++; Thread thread = newSessionProcessor(sessionName, acceptSocket, manager); thread.start(); } } } packagecom.sunhaojie.learntest.thirteenth; importjava.io.BufferedWriter; importjava.io.IOException; importjava.io.OutputStream; importjava.io.OutputStreamWriter; importjava.net.Socket; importjava.net.UnknownHostException; /** * @ClassName SenderProcessor * @Description TODO * * @author sunhaojie [email protected] * @date 2016年2月13日 下午10:22:36 */ publicclassSenderProcessor extendsThread { @Override publicvoidrun() { try{ Socket socket = newSocket("127.0.0.1", 8080); OutputStream outputStream = socket.getOutputStream(); BufferedWriter writer = newBufferedWriter(newOutputStreamWriter(outputStream)); for(inti = 0; i < 10; i++) { writer.write("发送的第"+ i + "句话"); writer.newLine(); writer.flush(); } writer.close(); outputStream.close(); socket.close(); System.out.println("输出完毕"); } catch(UnknownHostException e) { e.printStackTrace(); } catch(IOException e) { e.printStackTrace(); } } } packagecom.sunhaojie.learntest.thirteenth; /** * * @ClassName SocketClientTest * @Description 测试端输出类 * * @author sunhaojie [email protected] * @date 2016年2月12日 下午10:54:03 */ publicclassSocketClientTest { publicstaticvoidmain(String[] args) { for(inti = 0; i < 10; i++) { Thread thread = newSenderProcessor(); thread.start(); } } }
建议继续学习:
扫一扫订阅我的微信号:IT技术博客大学习
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK